当前位置: 首页 > news >正文

【面试】Kafka

Kafka

  • 1、为什么要使用 kafka
  • 2、Kafka 的架构是怎么样的
  • 3、什么是 Kafka 的重平衡机制
  • 4、Kafka 几种选举过程
  • 5、Kafka 高水位了解过吗
  • 6、Kafka 如何保证消息不丢失
  • 7、Kafka 如何保证消息不重复消费
  • 8、Kafka 为什么这么快

1、为什么要使用 kafka

1. 解耦:在一个复杂的系统中,不同的模块或服务之间可能需要相互依赖,如果直接使用函数调用或者API调用的方式,会造成模块之间的耦合,当其中一个模块发生改变时,需要同时修改调用方和被调用方的代码。而使用消息队列作为中间件,不同的模块可以将消息发送到消息队列中,不需要知道具体的接收方是谁,接收方可以独立地消费消息,实现了模块之间的解耦。

2. 异步:有些操作比较耗时,例如发送邮件、生成报表等,如果使用同步的方式处理,会阻塞主线程或者进程,导致系统的性能下降。而使用消息队列,可以将这些操作封装成消息,放入消息队列中,异步地处理这些操作,不影响主流程的执行,提高了系统的性能和响应速度。

3. 削峰:削峰是一种在高并发场景下平衡系统压力的技术,在削峰的过程中,通常使用消息队列作为缓冲区,将请求放入消息队列中,然后在系统负载低的时候进行处理。这种方式可以将系统的峰值压力分散到较长的时间段内,减少瞬时压力对系统的影响,从而提高系统的稳定性和可靠性。

2、Kafka 的架构是怎么样的

在这里插入图片描述

1、Producer 生产者

生产者负责将消息发布到 kafka 中的一个或多个主题,每个主题包含一个或多个分区,消息保存在各个分区上,每一个分区都是一个顺序的,分区中的消息都被分了一个序列号,称之为偏移量,就是指消息在分区中的位置,所有分区的消息加在一起就是一个主题的所有消息。

分区策略

分区策略说明
轮询策略按顺序轮流将每条数据分配到每个分区中
随机策略每次都随机地将消息分配到每个分区
按键保存策略生产者发送数据的时候,可以指定一个key,计算这个key的hashCodet值,按照hashCodel的值对不同消息进行存储

如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下,需要将分区数目设为1 或者指定消息的 key。

消息发送

public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

生产者架构图

消息在通过 send 方法发往 broker 的过程中,有可能需要经过拦截器、序列化器、分区器一系列之后才能被真正地发往 broker。整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender发送线程。

在这里插入图片描述
① 主线程

拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。通过自定义实现 ProducerInterceptor 接口来使用。

序列化

生产者需要用序列化器把对象转换成字节数组才能通过网络发送给 Kafka。消费者需要用反序列化把从 Kafka 中收到的字节数组转换成相应的对象。自带的有StringSerializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long等,还可以自定义序列化器。

分区器

如果消息中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。也可以自定义分区器。

消息累加器

消息累加器主要用来缓存消息以便 Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。消息累加器的缓存大小可以通过buffer.memory配置。在消息累加器的内部为每个分区都维护了一个双端队列,主线程发送过来的消息都会被追加到某个双端队列中,队列中的内容就是 ProducerBatch,即Dqueue< ProducerBatch >。

当一条消息流入消息累加器,如果这条消息小于batch.size参数大小则以batch.size参数大小创建 ProducerBatch,否则以消息的实际大小创建 ProducerBatch。

② Sender发送线

程负责从消息累加器中获取消息并将其发送到 Kafka 中。后续 Sender 从缓存中获取消息,进行转换,发送到broker。在发送前还会保存到InFlightRequests中,作用是缓存已经发送出去但还没有收到响应的请求,缓存数量由max.in.flight.requests.per.connection参数确定,默认是5,表示每个连接最多缓存5个未响应的请求。

2、Consumer 消费者

消费者,消息的订阅者,可以订阅一个或多个主题,并且依据消息生产的顺序读取他们,消费者通过检查消息的偏移量来区分已经读取过的消息。消费者一定属于某一个特定的消费组。消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 kafkal 的消息,我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。消息最终还是会被删除的,默认生命周期为1周(7*24小时)。

订阅主题和分区

通过 subscribe 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配政策来自动分配各个消费者与分区的关系,以实现消费者负载均衡和故障自动转移。而通过 assign 方法则没有。

消息消费

Kafka 中的消息是基于推拉模式的。Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll 方法,而 poll 方法返回的是所订阅的主题(分区)上的一组消息。如果没有消息则返回空。

public ConsumerRecords<K, V> (final Duration timeout)

timeout 用于控制 poll 方法的阻塞时间,没有消息时会阻塞。

位移提交

Kafka 中的每条消息都有唯一的 offset,用来标识消息在分区中对应的位置。Kafka 默认的消费唯一的提交方式是自动提交,由enable.auto.commit配置,默认为true。自动提交不是每一条消息提交一次,而是定期提交,周期由auto.commit.interval.ms配置,默认为5秒。

自动提交可能发生消息重复或者丢失的情况,Kafka 还提供了手动提交的方式。enable.auto.commit配置为false开启手动提交。

指定位移消费

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费。默认值为 lastest,表示从分区末尾开始消费消息;earliest 表示从起始开始消费;none为不进行消费,而是抛出异常。

seek 可以从特定的位移处开始拉去消息,得以追前消费或回溯消费。

public void seek(TopicPartition partition, long offset)

再均衡

再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。再均衡后也可能出现重复消费的情况。所以应尽量避免不必要的再均衡发生。

3、Consumer Group 消费者群组

同一个消费者组中保证每个分区只能被一个消费者使用 ,不会出现多个消费者读取同一个分区的情况,通过这种方式,消费者可以消费包含大量消息的主题。而且如果某个消费者失效,群组里的其他消费者可以接管失效悄费者的工作。

4、Broker 服务器

一个独立的 Kafka 服务器被称为 broker, broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

在集群中,每个分区都有一个Leader Broker和多个Follower Broker,只有Leader Broker才能处理生产者和消费者的请求,而Follower Broker只是Leader Broker的备份,用于提供数据的冗余备份和容错能力。如果Leader Broker发生故障,Kafka集群会自动将Follower Broker提升为新的Leader Broker,从而实现高可用性和容错能力。

AR、ISR、OSR

  • 分区中的所有副本统称为AR。
  • 所有与leader副本保持一定同步程度的副本组成ISR。
  • 与leader副本同步滞后过多的副本组成OSR。
  • AR = ISR +OSR。正常情况 应该AR=ISR,OSR集合为空。

5、 Log 日志存储

一个分区对应一个日志文件(Log),为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log 切分为多个 LogSegment,便于消息的维护和清理。Log在物理上只以(命名为topic-partitiom)文件夹的形式存储,而每个LogSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。

在这里插入图片描述
在这里插入图片描述

LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment的索引文件和数据文件

  • partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值
  • 数值大小为64位,20位数据字符长度,没有数字用0填充

消息压缩

一条消息通常不会太大,Kafka 是批量消息压缩,通过compression.type配置,默认为 producer,还可以配置为gzip、snappy、lz4,uncompressed表示不压缩。

日志索引

Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(log.index.interval.bytes指定,默认4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引文件项和时间戳索引文件项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。

日志清理

Kafka提供两种日志清理策略:

  1. 日志删除:按照一定的保留策略(基于时间、日志大小或日志起始偏移量)直接删除不符合条件的日志分段。
  2. 日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

页缓存

页缓存是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问,减少对磁盘IO的操作。

零拷贝

所谓的零拷贝是将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。减少了数据拷贝的次数和内核和用户模式之间的上下文切换。对于Linux操作系统而言,底层依赖于sendfile()方法实现。

一般的数据流程:磁盘 -> 内核 -> 应用 -> Socket -> 网卡,数据复制4次,上下文切换4次。

在这里插入图片描述
流程步骤:

  1. 操作系统将数据从磁盘文件中读取到内核空间的页面缓存。
  2. 应用程序将数据从内核空间读入用户空间缓冲区。
  3. 应用程序将读到数据写回内核空间并放入socket缓冲区。
  4. 操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。

通过网卡直接去访问系统的内存,就可以实现现绝对的零拷贝了。这样就可以最大程度提高传输性能。通过“零拷贝”技术,我们可以去掉那些没必要的数据复制操作, 同时也会减少上下文切换次数。

在这里插入图片描述
通过上图可以看到,零拷贝技术只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。

6、ZooKeeper

ZooKeeper是Kafka集群中使用的分布式协调服务,用于维护Kafka集群的状态和元数据信息,例如主题和分区的分配信息、消费者组和消费者偏移量等。

3、什么是 Kafka 的重平衡机制

Kafka 的重平衡(Rebalance)机制是指在消费者组(Consumer Group)中,当消费者数量发生变化(如新增、减少或崩溃)时,Kafka 重新分配分区(Partition)给消费者的过程。重平衡的目的是确保每个分区只被组内的一个消费者消费,从而实现负载均衡和高可用性。

在这里插入图片描述

重平衡的个触发条件:

  • 消费者加入组:新的消费者加入消费者组。
  • 消费者离开组:消费者主动离开(如关闭)或崩溃。
  • 订阅主题变化:消费者组订阅的主题或分区数量发生变化。
  • 分区数量变化:主题的分区数量发生变化。

重平衡的过程

  1. 选举协调者:消费者组中的某个 Broker 被选为协调者,负责管理重平衡过程。
  2. 发送加入组请求:所有消费者向协调者发送加入组请求(JoinGroup Request)。
  3. 选举领导者:协调者从消费者中选出一个领导者(Leader),其他消费者成为跟随者(Follower)。
  4. 分配分区:领导者根据分区分配策略(如 RangeAssignor、RoundRobinAssignor 等)为每个消费者分配分区,并将分配结果发送给协调者。
  5. 同步分配结果:协调者将分配结果同步给所有消费者,消费者根据分配结果开始消费。

重平衡的影响

  1. 消费暂停:在重平衡期间,消费者会暂停消费,直到分配完成。
  2. 性能开销:频繁的重平衡会增加集群的负载,影响整体性能。
  3. 重复消费:重平衡可能导致消费者重新读取已处理的消息,造成重复消费。

减少重平衡的策略

  1. 优化会话超时:合理设置 session.timeout.ms 和 heartbeat.interval.ms,避免因网络延迟导致的误判。
  2. 减少消费者变动:尽量避免频繁地启动或关闭消费者。
  3. 使用静态成员资格:Kafka 2.3+ 支持静态成员资格(Static Membership),减少因消费者短暂离线触发的重平衡。
  4. 优化分区分配策略:根据业务需求选择合适的分配策略,减少不必要的重平衡。

Kafka 的重平衡机制确保消费者组在变化时能重新分配分区,实现负载均衡和高可用性。尽管重平衡会带来一定的性能开销,但通过合理配置和优化,可以显著减少其影响。

4、Kafka 几种选举过程

  1. 控制器(Controller)选举

集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时
会去尝试读取/controller节点的brokerid的值,读取到的brokerid的值不为-1知道已经有其他broker节点成功竞选为控制器,就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。

  1. 分区领导者(Partition Leader)选举

controller感知到分区leader所在的broker挂了,controller会从replicas副本列表(同时在ISR列表里)中取出第一个broker作为leader。

分区领导者负责处理读写请求,选举过程由控制器管理。触发选举的情况包括:

  • 分区领导者崩溃
  • 分区副本不再同步
  • 手动触发选举
  1. 消费者组领导者(Consumer Group Leader)选举

GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法很简单,当消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果当前leader退出消费组,则会挑选以HashMap结构保存的消费者节点数据中,第一个键值对来作为leader。

在消费者组中,协调者负责管理消费者和分区分配。选举过程如下:

  • 消费者加入组时,向协调者发送加入请求。
  • 协调者从消费者中选出一个领导者,负责分区分配。
  • 领导者根据分配策略分配分区,并将结果发送给协调者。
  • 协调者将分配结果同步给所有消费者。
  1. ZooKeeper 领导者选举(如果使用 ZooKeeper)

在依赖 ZooKeeper 的 Kafka 集群中,ZooKeeper 通过 Zab 协议选举领导者,确保其自身的高可用性。Kafka Broker 利用 ZooKeeper 进行元数据管理和选举。

5、Kafka 高水位了解过吗

Kafka 的高水位(High Watermark,HW)是一个关键概念,用于确保数据的一致性和可靠性。高水位机制在 Kafka 的分区副本管理中起着重要作用,尤其是在保证消息的持久化和消费者的可见性方面。

什么是高水位(High Watermark)

高水位是分区中已成功复制到所有同步副本(ISR,In-Sync Replicas)的消息的偏移量(Offset)。它表示消费者可以安全读取的消息范围,即消费者只能读取到高水位之前的消息。

  • 高水位之前的消息:这些消息已经被所有 ISR 副本确认,是已提交(Committed)的消息,消费者可以安全消费。
  • 高水位之后的消息:这些消息尚未被所有 ISR 副本确认,可能丢失,因此对消费者不可见。

高水位标识了一个特定的消息偏移量(offset),即一个分区中已提交消息的最高偏移量(offset),消费者只能拉取到这个offset之前的消息。消费者可以通过跟踪高水位来确定自己消费的位置。

高水位的作用

  • 消费进度管理:消费者可以通过记录上一次消费的偏移量,然后将其与分区的高水位进行比较,来确定自己的消费进度。消费者可以在和高水位对北比之后继续消费新的消息,确保不会错过任何已提交的消息。这样,消费者可以按照自己的节奏进行消费,不受其他消费者的影响。
  • 数据的可靠性:高水位还用于确保数据的可靠性。在Kafka中,只有消息被写入主副本并被所有的同步副本ISR确认后,才被认为是已提交的消息。高水位表示已经被提交的消息的边界。只有高水位之前的消息才能被认为是已经被确认的,其他的消息可能会因为副本故障或其他原因而丢失。

高水位的工作原理

  1. 生产者写入消息:生产者将消息发送到分区的领导者(Leader),领导者将消息写入本地日志。领导者将消息复制到所有 ISR 副本。
  2. 更新高水位:当所有 ISR 副本都成功复制了某条消息后,领导者会更新高水位。高水位是 ISR 副本中最小日志结束偏移量(LEO,Log End Offset)的最小值。
  3. 消费者读取消息:消费者只能读取到高水位之前的消息。如果消费者尝试读取高水位之后的消息,会被阻塞,直到这些消息被提交。

高水位更新示例

假设一个分区有 3 个副本(Leader 和 2 个 Follower),它们的 LEO 分别为:

Leader LEO = 10
Follower1 LEO = 9
Follower2 LEO = 8

此时高水位为 8,因为它是 ISR 副本中 LEO 的最小值。消费者只能读取偏移量 0 到 7 的消息。

高水位与 LEO(Log End Offset)的关系

  • LEO(Log End Offset):LEO 是分区副本中下一条待写入消息的偏移量。每个副本(包括领导者和跟随者)都有自己的 LEO。

  • 高水位(HW):高水位是 ISR 副本中 LEO 的最小值。它表示已提交消息的边界。

在这里插入图片描述
当消费者消费消息时,它可以使用高水位作为参考点,只消费高水位之前的消息,以确保消费的是已经被确认的消
息,从而保证数据的可靠性。如上图,只消费offet为6之前的消息。

我们都知道,在Kafka中,每个分区都有一个Leader副本和多个Follower副本。当Leader副本发生故障时,Kafka会选择一个新的Leader副本。这个切换过程中,需要保证数据的一致性,即新的Leader副本必须具有和旧Leader副本一样的消息顺序。为了实现这个目标,Kafka引入了Leader Epoch的概念。

Leader Epoch的过程

  1. 每个分区都有一个初始的Leader Epoch,通常为0。
  2. 当Leader副本发生故障或需要进行切换时,Kafka会触发副本切换过程。
  3. 副本切换过程中,Kafka会从ISR同步副本)中选择一个新的Follower副本作为新的Leader副本。
  4. 新的Leader副本会增加自己的Leader Epoch,使其大于之前的Leader Epoch。这表示进入了一个新的任期。
  5. 新的Leader副本会验证旧Leader副本的状态以确保数据的一致性。它会检查旧Leader副本的Leader Epoch和高水位。
  6. 如果旧Leader副本的Leader Epoch小于等于新Leader副本的Leader Epoch,并且旧Leader副本的高水位小于等于新Leader副本的高水位,则验证通过。
  7. 验证通过后,新的Leader副本开始从旧Leader副本复制数据。它只会接受旧Leader副本的Leader Epoch和高水位之前的消息。
  8. 一旦新的Leader副本复制了旧Leader副本的所有数据,并达到了与旧Leader副本相同的高水位,副本切换过程就完成了。

6、Kafka 如何保证消息不丢失

  1. 消息确认机制
/*** (1)acks=0:生生产者不等待确认,消息可能丢失,其实就是保证消息不会重复发送或者重复消费,但是速度最快。同时重试配置不会发生作用。* (2)acks=1:默认值,领导者确认后即认为消息已发送,但若领导者在同步前崩溃,消息可能丢失。* (3)acks=all或acks=-1:所有同步副本确认后,消息才被认为已提交,确保消息不丢失。*/
props.put(ProducerConfig.ACKS_CONFIG, "all");/*** 如果请求失败,生产者会自动重试,如果启用重试*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);/*** 消息发送超时或失败后,间隔的重试时间*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

并且生产者调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。所以,我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。可以采用为其添加回调函数的形式,如果消息发送失败的话,可以对失败消息做记录,我们检查失败的原因之后重新发送即可!

// 异步发送消息
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("发送成功");} else {System.out.println("发送失败");}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic‐" + metadata.topic() + "|partition‐"+ metadata.partition() + "|offset‐" + metadata.offset());}}
});
  1. 副本机制和持久化存储

复制机制:Kafka使用复制机制来保证数据的可靠性。每个分区都有多个副本,副本可以分布在不同的节点上,当一个节点宕机时,其他节点上的副本仍然可以提供服务,保证消息不丢失。

ISR机制:Kafka使用ISR机制来确保消息不会丢失。ISR是指已经复制了数据并与主节点保持同步的节点集合,只有SR中的节点才会被认为是“可用”的节点,只有在ISR中的节点上的副本才会被认为是“可用”。

持久化存储:Kafka 将消息持久化存储在磁盘上,即使 Broker 重启,消息也不会丢失。Kafka 的日志结构设计支持高效的消息写入和读取。
在服务端,也有一些参数配置可以调节来避免消息丢失:

replication.factor //表示分区副本的个数,replication.factor>1 当1eader副本挂了,follower副本会被选举为leader继续提供服务。
min.insync.rep1icas //表示ISR最少的副本数量,通常设置min.insync.replicas>1,这样才有可用的fol1ower副本执行替换,保证消息不丢
unclean.leader.election.enable=false //是否可以把非ISR集合中的副本选举为leader副本。
  1. 消费者偏移量管理

消费者通过提交偏移量来记录消费进度。Kafka 提供自动和手动提交偏移量的方式,确保消费者在崩溃后能从正确的位置继续消费,避免消息丢失。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。

while (true) {/*** poll() API 是拉取消息的长轮询 比如设置了1000毫秒 并不是在这1秒钟内只拉取一次 而是当没有拉取到数据时 会多次拉取数据 直到拉取到数据 然后继续循环*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}if (records.count() > 0) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了// consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getStackTrace());}}});}
}

这种情况的解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。

  1. 高水位机制

Kafka 使用高水位(High Watermark)机制确保消费者只能读取已提交的消息,避免读取未完全复制的消息。

  1. 数据保留策略

Kafka 允许配置消息的保留时间和大小,确保在指定时间内或达到大小限制前,消息不会被删除。

7、Kafka 如何保证消息不重复消费

  1. 消费者偏移量管理

Kafka 消费者通过提交偏移量(Offset)来记录消费进度。如果偏移量提交不当,可能导致重复消费。为了避免重复消费,可以采取以下策略:

  • 手动提交偏移量:

    • 消费者在处理完消息后,手动提交偏移量(enable.auto.commit=false)。
    • 确保消息处理和偏移量提交在同一个事务中,避免消息处理成功但偏移量未提交的情况。
  • 同步提交:

    • 使用同步提交(commitSync())而不是异步提交(commitAsync()),确保偏移量提交成功后再继续消费。
  • 幂等性处理:

    • 在消费者端实现幂等性逻辑,即使消息重复消费,也不会对系统产生影响。
  1. 事务机制

Kafka 从 0.11 版本开始支持事务机制,可以实现精确一次语义(Exactly-Once Semantics)。通过事务机制,生产者和消费者可以确保消息的精确一次处理。

  • 生产者事务:

    • 生产者开启事务,将消息发送和偏移量提交放在同一个事务中。
    • 如果事务失败,消息和偏移量都不会提交。
  • 消费者事务:

    • 消费者在消费消息时,可以将消息处理和偏移量提交放在同一个事务中。
    • 如果事务失败,消息处理和偏移量提交都会回滚。
  • 配置:

    • 生产者配置:enable.idempotence=true 和 transactional.id。
    • 消费者配置:isolation.level=read_committed,确保只读取已提交的消息。
  1. 幂等性生产者

Kafka 生产者支持幂等性(Idempotence),可以避免消息重复发送。

  • 配置:
    • 设置 enable.idempotence=true,生产者会为每条消息分配唯一的序列号(Sequence Number),Broker 会根据序列号去重。
  • 作用:
    • 即使在网络重试的情况下,Broker 也不会重复存储相同的消息。
  1. 消费者组的重平衡

在消费者组发生重平衡(Rebalance)时,可能会导致部分消息重复消费。为了减少重复消费,可以采取以下措施:

  • 减少重平衡频率:

    • 优化消费者组的配置,如 session.timeout.ms 和 heartbeat.interval.ms,避免不必要的重平衡。
  • 静态成员资格:

    • 使用 Kafka 2.3+ 的静态成员资格(Static Membership)功能,减少因消费者短暂离线触发的重平衡。
  1. 外部存储去重

如果 Kafka 本身无法完全避免重复消费,可以在消费者端使用外部存储(如数据库、Redis)实现去重:

  • 记录已处理消息:

    • 在外部存储中记录已处理消息的唯一标识(如消息 ID 或偏移量)。
    • 在处理消息前,先检查该消息是否已处理。
  • 实现幂等性:

    • 在消费者端实现幂等性逻辑,确保即使消息重复消费,也不会对系统产生影响。
  1. 消息唯一标识

为每条消息分配唯一标识(如消息 ID),在消费者端根据唯一标识去重:

  • 生产者生成唯一 ID:
    • 生产者在发送消息时,为每条消息生成唯一 ID。
  • 消费者去重:
    • 消费者在处理消息时,检查唯一 ID 是否已处理。

8、Kafka 为什么这么快

  1. 批处理(Batching):生产者将多条消息打包成一个批次(Batch)发送,减少了网络请求的次数。通过配置 linger.ms 和 batch.size 参数,可以优化批处理的大小和延迟。消费者一次拉取多个消息,减少了网络往返时间(RTT)和系统调用次数。

  2. 分区和并行化:Kafka 将主题(Topic)划分为多个分区(Partition),每个分区可以独立读写。分区机制允许生产者和消费者并行操作,提高了吞吐量。每个分区可以有多个副本(Replica),副本之间并行同步数据。

  3. 高效的存储格式:Kafka 将日志文件划分为多个固定大小的段(Segment),便于管理和清理。个段文件以偏移量命名,方便快速定位和读取。Kafka 为每个日志段维护一个索引文件,支持快速查找消息。

  4. 压缩机制:消息压缩,Kafka 支持多种压缩算法(如 Snappy、Gzip、LZ4),减少网络传输和磁盘存储的开销。Kafka 支持多种压缩算法(如 Snappy、Gzip、LZ4),减少网络传输和磁盘存储的开销。

  5. 零拷贝技术(Zero-Copy):传统的数据传输需要多次拷贝:磁盘 -> 内核缓冲区 -> 用户缓冲区 -> 网络缓冲区。Kafka 使用零拷贝技术,通过 sendfile 系统调用直接将数据从磁盘文件传输到网络通道,避免了用户空间和内核空间之间的数据拷贝,大幅减少了 CPU 开销和上下文切换。

相关文章:

【面试】Kafka

Kafka 1、为什么要使用 kafka2、Kafka 的架构是怎么样的3、什么是 Kafka 的重平衡机制4、Kafka 几种选举过程5、Kafka 高水位了解过吗6、Kafka 如何保证消息不丢失7、Kafka 如何保证消息不重复消费8、Kafka 为什么这么快 1、为什么要使用 kafka 1. 解耦&#xff1a;在一个复杂…...

PHP MySQL 创建数据库

PHP MySQL 创建数据库 引言 在网站开发中&#xff0c;数据库是存储和管理数据的核心部分。PHP 和 MySQL 是最常用的网页开发语言和数据库管理系统之一。本文将详细介绍如何在 PHP 中使用 MySQL 创建数据库&#xff0c;并对其操作进行详细讲解。 前提条件 在开始创建数据库之…...

通义万相 2.1 × 蓝耘智算:AIGC 界的「黄金搭档」如何重塑创作未来?

我的个人主页 我的专栏&#xff1a; 人工智能领域、java-数据结构、Javase、C语言&#xff0c;希望能帮助到大家&#xff01;&#xff01;&#xff01; 点赞&#x1f44d;收藏❤ 引言 在当今数字化浪潮席卷的时代&#xff0c;AIGC&#xff08;生成式人工智能&#xff09;领域正…...

【面试题系列】:使用消息队列怎么防止消息重复?从原理到实战……

一、消息队列的核心价值与挑战 消息队列&#xff08;MQ&#xff09;作为现代分布式系统的基础设施&#xff0c;其核心价值在于解耦、削峰填谷和异步通信。但在追求高可靠性的过程中&#xff0c;消息重复成为必须攻克的技术难题。根据调研数据&#xff0c;在生产环境中消息重复…...

Damage与Injury

### “Damage”和“Injury”的区别 “Damage”和“Injury”都有“损害”或“伤害”的意思&#xff0c;但它们的用法、语境和侧重点有所不同。以下是从词性、适用对象、语义侧重和具体场景四个方面详细对比两者的区别&#xff1a; --- #### 1. **词性** - **Damage**&#xf…...

18 HarmonyOS NEXT UVList组件开发指南(五)

温馨提示&#xff1a;本篇博客的详细代码已发布到 git : https://gitcode.com/nutpi/HarmonyosNext 可以下载运行哦&#xff01; 第五篇&#xff1a;UVList组件最佳实践与实际应用案例 文章目录 第五篇&#xff1a;UVList组件最佳实践与实际应用案例1. 最佳实践总结1.1 组件设计…...

vue3组合式API怎么获取全局变量globalProperties

设置全局变量 main.ts app.config.globalProperties.$category { index: 0 } 获取全局变量 const { appContext } getCurrentInstance() as ComponentInternalInstance console.log(appContext.config.globalProperties.$category) 或是 const { proxy } getCurrentInstance…...

华为机试牛客刷题之HJ14 字符串排序

HJ14 字符串排序 描述 对于给定的由大小写字母混合构成的 n 个单词&#xff0c;输出按字典序从小到大排序后的结果。 从字符串的第一个字符开始逐个比较&#xff0c;直到找到第一个不同的位置&#xff0c;通过比较这个位置字符对应的&#xff08;A<⋯<Z<a<⋯<…...

CPU 负载 和 CPU利用率 的区别

简单记录下 top 命令中&#xff0c;CPU利用率核CPU负载的概念&#xff0c; &#xff08;1&#xff09;CPU利用率&#xff1a;指在一段时间内 表示 CPU 实际工作时间占总时间的百分比。表示正在执行进程的时间比例&#xff0c;包括用户空间和内核空间程序的执行时间。通常包含以…...

SSM框架

SSM 框架是 Java Web 开发中广泛使用的经典组合&#xff0c;由 Spring、Spring MVC 和 MyBatis 三个开源框架整合而成&#xff0c;适用于构建中大型企业级应用。 1. SSM框架组成 框架作用核心特性Spring管理业务层&#xff08;Service&#xff09;和持久层&#xff08;DAO&am…...

maven无法解析插件 org.apache.maven.plugins:maven-jar-plugin:3.4.1

解决流程 1.修改maven仓库库地址 2.删除本地的maven仓库 maven插件一直加载有问题: 无法解析插件 org.apache.maven.plugins:maven-jar-plugin:3.4.1 开始以为maven版本有问题&#xff0c;重装了maven&#xff0c;重装了idea工具。结果问题还是没解决。研究之后发现&#xf…...

如何修复“RPC 服务器不可用”错误

远程过程调用&#xff08;Remote Procedure Call&#xff0c; RPC&#xff09;是允许客户端在不同计算机上执行进程的众多可用网络进程之一。本文将深入探讨RPC如何在不同的软件系统之间实现无缝消息交换&#xff0c;同时重点介绍与RPC相关的常见错误的一些原因。 什么是远程过…...

晋升系列4:学习方法

每一个成功的人&#xff0c;都是从底层开始打怪&#xff0c;不断的总结经验&#xff0c;一步一步打上来的。在这个过程中需要坚持、总结方法论。 对一件事情长久坚持的人其实比较少&#xff0c;在坚持的人中&#xff0c;不断的总结优化的更少&#xff0c;所以最终达到高级别的…...

单链表-代码精简版

单链表核心知识详解 单链表是一种动态存储的线性数据结构&#xff0c;其特点是逻辑上连续&#xff0c;物理上非连续&#xff0c;每个节点包含数据域和指向下一个节点的指针域。以下是核心知识点与完整实现代码&#xff1a; 一、单链表的结构定义 单链表节点通过结构体自引用…...

关于前后端整合和打包成exe文件的个人的总结和思考

前言 感觉有很多东西&#xff0c;不知道写什么&#xff0c;随便写点吧。 正文 前后端合并 就不说怎么开发的&#xff0c;就说点个人感觉重要的东西。 前端用ReactViteaxios随便写一个demo&#xff0c;用于CRUD。 后端用Django REST Framework。 设置前端打包 import { …...

基于muduo+mysql+jsoncpp的简易HTTPWebServer

一、项目介绍 本项目基于C语言、陈硕老师的muduo网络库、mysql数据库以及jsoncpp&#xff0c;服务器监听两个端口&#xff0c;一个端口用于处理http请求&#xff0c;另一个端口用于处理发送来的json数据。 此项目在实现时&#xff0c;识别出车牌后打包为json数据发送给后端服务…...

Java/Kotlin逆向基础与Smali语法精解

1. 法律警示与道德边界 1.1 司法判例深度剖析 案例一&#xff1a;2021年某游戏外挂团伙刑事案 犯罪手法&#xff1a;逆向《王者荣耀》通信协议&#xff0c;修改战斗数据包 技术细节&#xff1a;Hook libil2cpp.so的SendPacket函数 量刑依据&#xff1a;非法经营罪&#xff…...

C++:入门详解(关于C与C++基本差别)

目录 一.C的第一个程序 二.命名空间&#xff08;namespace&#xff09; 1.命名空间的定义与使用&#xff1a; &#xff08;1&#xff09;命名空间里可以定义变量&#xff0c;函数&#xff0c;结构体等多种类型 &#xff08;2&#xff09;命名空间调用&#xff08;&#xf…...

CI/CD—GitLab钩子触发Jenkins自动构建项目

GitLab钩子简介&#xff1a; 项目钩子 项目钩子是针对单个项目的钩子&#xff0c;会在项目级别的特定事件发生时触发。这些事件包括代码推送、合并请求创建、问题创建等。项目钩子由项目管理员或具有相应权限的用户进行配置&#xff0c;仅对特定项目生效。 使用场景&#xff1a…...

RPA 职业前景:个人职场发展的 “新机遇”

1. RPA职业定义与范畴 1.1 RPA核心概念 机器人流程自动化&#xff08;RPA&#xff09;是一种通过软件机器人模拟人类操作&#xff0c;自动执行重复性、规则性任务的技术。RPA的核心在于其能够高效、准确地处理大量数据和流程&#xff0c;减少人工干预&#xff0c;从而提高工作…...

【CSS3】金丹篇

目录 标准流概念元素类型及排列规则块级元素行内元素行内块元素 标准流的特点打破标准流 浮动基本使用清除浮动额外标签法单伪元素法双伪元素法&#xff08;推荐&#xff09;overflow 法 Flex 布局Flex 组成主轴对齐方式侧轴对齐方式修改主轴方向弹性盒子伸缩比弹性盒子换行行对…...

Git(一)

一、介绍 二、Git代码托管服务 三、Git常用命令 全局设置&#xff1a; 获取Git仓库&#xff1a; 工作区、暂存区、版本库概念&#xff1a; Git工作区文件的状态&#xff1a; 本地仓库操作&#xff1a; 远程仓库操作&#xff1a; 分支操作&#xff1a; 标签操作&#xff1a; 四…...

Python大数据可视化:基于spark的短视频推荐系统的设计与实现_django+spider

开发语言&#xff1a;Python框架&#xff1a;djangoPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 管理员登录 管理员功能界面 热门视频界面 用户界面 用户反馈界面 论坛交流界面 系统…...

面试题之react useMemo和uesCallback

在面试中&#xff0c;关于 React 中的 useMemo 和 useCallback 的区别 是一个常见的问题。 useMemo 和 useCallback 的区别 1. 功能定义 useMemo&#xff1a; 用于缓存计算结果&#xff0c;避免在每次组件渲染时重新计算复杂的值。它接受一个计算函数和一个依赖数组&#xff0…...

K8S学习之基础十九:k8s的四层代理Service

K8S四层代理Service 四层负载均衡Service 在k8s中&#xff0c;访问pod可以通过ip端口的方式&#xff0c;但是pod是由生命 周期的&#xff0c;pod在重启的时候ip地址往往会发生变化&#xff0c;访问pod就需要新的ip地址&#xff0c;这样就会很麻烦&#xff0c;每次pod地址改变就…...

C++:string容器(下篇)

1.string浅拷贝的问题 // 为了和标准库区分&#xff0c;此处使用String class String { public :/*String():_str(new char[1]){*_str \0;}*///String(const char* str "\0") // 错误示范//String(const char* str nullptr) // 错误示范String(const char* str …...

sudo systemctl restart docker 重启docker失败

一般会使用如下命令&#xff0c;进行docker重启。 sudo systemctl daemon-reload sudo systemctl restart docker 重启失败时&#xff0c;会提示&#xff1a;Job for docker.service failed because the control process exited with error code. See "systemctl status…...

Linux基本操作指令3

1、wget: 这是一个用于从网络上下载文件的命令行工具。它支持 HTTP、HTTPS 和 FTP 协议。 wget http://download.qt.io/archive/qt/5.12/5.12.9/qt-opensource-linux-x64-5.12.9.run 2、下载完成后&#xff0c;你可以通过以下命令使文件可执行并运行安装程序&#xff1a; ch…...

React:类组件(上)

kerwin老师我来了 类组件的创建 class组件&#xff0c;js里的类命名首字符大写&#xff0c;类里面包括构造函数&#xff0c;方法 组件类要继承React.Component才有效 必须包含render方法 import React from react class App extends React.Component{render() {return <…...

sqli-lab靶场学习(七)——Less23-25(关键字被过滤、二次注入)

前言 之前的每一关&#xff0c;我们都是在末尾加上注释符&#xff0c;屏蔽后面的语句&#xff0c;这样我们只要闭合了区间之后&#xff0c;差不多就是为所欲为的状态。但如果注释符不生效的情况下&#xff0c;又该如何呢&#xff1f; Less23&#xff08;注释符被过滤&#xff…...

虚函数和虚表的原理是什么?

虚函数是一个使用virtual关键字声明的成员函数&#xff0c;在基类中声明虚函数&#xff0c;在子类中可以使用override重写该函数。虚函数根据指针或引用指向的实际对象调用&#xff0c;实现运行时的多态。 虚函数表&#xff08;虚表&#xff09;是一个用于存储虚函数地址的数组…...

RReadWriteLock读写锁应用场景

背景 操作涉及一批数据&#xff0c;如订单&#xff0c;可能存在多个场景下操作&#xff0c;先使用读锁&#xff0c;从redis缓存中获取操作中数据 比如 关闭账单&#xff0c; 发起调账&#xff0c; 线下结算&#xff0c; 合并支付 先判断当前操作的数据&#xff0c;是否在…...

【面试】MySQL

MySQL 1、数据库三范式2、什么是关系型数据库&#xff0c;什么是非关系型数据库3、什么是数据库存储引擎4、MySQL5.x和8.0有什么区别5、char 和 varchar 的区别6、in 和 exists 的区别7、MySQL 时间类型数据存储建议8、drop、delete 与 truncate 区别9、一条 Sql 的执行顺序10、…...

Trae AI 开发工具使用手册

这篇手册将介绍 Trae 的基本功能、安装步骤以及使用方法&#xff0c;帮助开发者快速上手这款工具。 Trae AI 开发工具使用手册 Trae 是字节跳动于 2025 年推出的一款 AI 原生集成开发环境&#xff08;IDE&#xff09;&#xff0c;旨在通过智能代码生成、上下文理解和自动化任务…...

表格columns拼接两个后端返回的字段(以umi框架为例)

在用组件对前端项目进行开发时&#xff0c;我们会遇到以下情况&#xff1a;项目原型中有取值范围这个表字段&#xff0c;需要存放最小取值到最大取值。 而后端返回给我们的数据是返回了一个最小值和一个最大值&#xff0c; 在columns中我们需要对这两个字段进行拼接&#xff0…...

常见的算法题python

字符串倒序 def func1(str):return str[::-1] def func2(str):new_str ""for i in str:new_str inew_strreturn new_str if __name____main__:str"linda"print(func2(str))合并两个有序的列表 def func3(list1,list2):for i in list1:list2.append(i)li…...

linux学习(十)(磁盘和文件系统(索引节点,文件系统,添加磁盘,交换,LVM公司,挂载))

Linux 磁盘文件系统 Linux 使用各种文件系统来允许我们从计算机系统的硬件&#xff08;例如磁盘&#xff09;存储和检索数据。文件系统定义了如何在这些存储设备上组织、存储和检索数据。流行的 Linux 文件系统示例包括 EXT4、FAT32、NTFS 和 Btrfs。 每个文件系统都有自己的…...

k8s v1.28.15部署(kubeadm方式)

k8s部署&#xff08;kubeadm方式&#xff09; 部署环境及版本 系统版本&#xff1a;CentOS Linux release 7.9.2009 k8s版本&#xff1a;v1.28.15 docker版本&#xff1a;26.1.4 containerd版本&#xff1a;1.6.33 calico版本&#xff1a;v3.25.0准备 主机ip主机名角色配置1…...

Python开发Scikit-learn面试题及参考答案

目录 如何用 SimpleImputer 处理数据集中的缺失值? 使用 StandardScaler 对数据进行标准化的原理是什么?与 MinMaxScaler 有何区别? 如何用 OneHotEncoder 对类别型特征进行编码? 解释特征选择中 SelectKBest 与 VarianceThreshold 的应用场景。 如何通过 PolynomialFe…...

Java在小米SU7 Ultra汽车中的技术赋能

目录 一、智能驾驶“大脑”与实时数据 场景一&#xff1a;海量数据的分布式计算 场景二&#xff1a;实时决策的毫秒级响应 场景三&#xff1a;弹性扩展与容错机制 技术隐喻&#xff1a; 二、车载信息系统&#xff08;IVI&#xff09;的交互 场景一&#xff1a;Android Automo…...

蓝队第三次

1.了解什么是盲注 盲注&#xff08;Blind SQL Injection&#xff09;是SQL注入的一种形式&#xff0c;攻击者无法直接通过页面回显或错误信息获取数据&#xff0c;而是通过观察页面的布尔状态&#xff08;真/假&#xff09;或时间延迟来间接推断数据库信息。例如&#xff0c;通…...

Element Plus中的树组件的具体用法(持续更新!)

const defaultProps {//子树为节点对象的childrenchildren: children,//节点标签为节点对象的name属性label: name, } 属性 以下是树组件中的常用属性以及作用&#xff1a; data&#xff1a;展示的数据&#xff08;数据源&#xff09; show-checkbox&#xff1a;节点是否可…...

nodejs使用WebSocket实现聊天效果

在nodejs中使用WebSocket实现聊天效果&#xff08;简易实现&#xff09; 安装 npm i ws 实现 创建 server.js /*** 创建一个 WebSocket 服务器&#xff0c;监听指定端口&#xff0c;并处理客户端连接和消息。** param {Object} WebSocket - 引入的 WebSocket 模块&#xff0c…...

通领科技冲刺北交所

高质量增长奔赴产业新征程 日前&#xff0c;通领科技已正式启动在北交所的 IPO 进程&#xff0c;期望借助资本市场的力量&#xff0c;加速技术升级&#xff0c;推动全球化战略布局。这一举措不仅展现了中国汽车零部件企业的强大实力&#xff0c;也预示着行业转型升级的新突破。…...

利用LLMs准确预测旋转机械(如轴承)的剩余使用寿命(RUL)

研究背景 研究问题:如何准确预测旋转机械(如轴承)的剩余使用寿命(RUL),这对于设备可靠性和减少工业系统中的意外故障至关重要。研究难点:该问题的研究难点包括:训练和测试阶段数据分布不一致、长期RUL预测的泛化能力有限。相关工作:现有工作主要包括基于模型的方法、数…...

comctl32!ListView_OnSetItem函数分析LISTSUBITEM结构中的image表示图标位置

第一部分&#xff1a; BOOL ListView_SetSubItem(LV* plv, const LV_ITEM* plvi) { LISTSUBITEM lsi; BOOL fChanged FALSE; int i; int idpa; HDPA hdpa; if (plvi->mask & ~(LVIF_DI_SETITEM | LVIF_TEXT | LVIF_IMAGE | LVIF_STATE)) { …...

Django工程获取请求参数的几种方式

在 Django 中获取请求参数的完整方法如下&#xff1a; 一、GET 请求参数获取 def view_func(request):# 获取单个参数&#xff08;推荐方式&#xff09;name request.GET.get(name, default) # 带默认值age request.GET.get(age, 0)# 获取多个同名参数&#xff08;如复选框…...

使用Qt调用HslCommunication(C++调用C#库)

使用C/CLI 来调用C#的dll 任务分解&#xff1a; 1、实现C#封装一个调用hsl的dll&#xff1b; 2、实现C控制台调用C#的dll库&#xff1b; 3、把调用C#的dll用C再封装为一个dll&#xff1b; 4、最后再用Qt调用c的dll&#xff1b; 填坑&#xff1a; 1、开发时VS需要安装CLI项目库…...

C++中的构造函数

目录 一、什么是构造函数&#xff1a; 二、构造函数的特性和使用&#xff1a; 1、构造函数的特性&#xff1a; 2、构造函数的重载&#xff1a; 三、默认生成的构造函数&#xff1a; 一、什么是构造函数&#xff1a; 在C中&#xff0c;当创建一个对象之后&#xff0c;就会自…...

MySQL知识点(第一部分)

MySQL 基础&#xff1a; 1、SQL语句的分类&#xff1a; DDL&#xff1a;用于控制数据库的操作DML&#xff1a;用于控制表结构的字段&#xff0c;增、删、修DQL&#xff1a;用于查询语句DCL&#xff1a;用于管理数据库&#xff0c;用户&#xff0c;数据库的访问 权限。 2、M…...