当前位置: 首页 > news >正文

kafka消费者详细介绍(超级详细)

文章目录

  • 一、Kafka 消费者与消费者组
    • 1.1 Kafka 消费者(Consumer)概述
      • 1.1.1 消费者工作流程
      • 1.1.2 消费者的关键配置
    • 1.2 Kafka 消费者组(Consumer Group)概述
      • 1.2.1 消费者组的工作原理
      • 1.2.2 消费者组的优点
      • 1.2.3 消费者组的再均衡
      • 1.2.4 消费者组的关键配置
    • 1.3 消费者与消费者组的关系
      • 1.3.1 单消费者与消费者组
      • 1.3.2 消费者组的偏移量管理
      • 1.3.3 消费者组的再均衡与负载均衡
    • 1.4 消费者组与分区的关系
  • 二、kafka消费者客户端开发
    • 2.1 Kafka 消费者客户端开发基础
      • Kafka 消费者客户端开发的核心 API:
    • 2.2 Kafka 消费者客户端配置
      • 示例:Kafka 消费者客户端配置
  • 三、Kafka消费者关键参数
    • 3.1 `bootstrap.servers`
    • 3.2 `group.id`
    • 3.3 `key.deserializer` 和 `value.deserializer`
    • 3.4 `enable.auto.commit`
    • 3.5 `auto.commit.interval.ms`
    • 3.6 `auto.offset.reset`
    • 3.7 `max.poll.records`
    • 3.8 `session.timeout.ms`
    • 3.9 `heartbeat.interval.ms`
    • 3.10 `fetch.min.bytes`
    • 3.11 `fetch.max.wait.ms`
    • 3.12 `client.id`
    • 3.13 `max.poll.interval.ms`
    • 3.14 `partition.assignment.strategy`
    • 3.15 `isolation.level`
  • 四、Kafka 反序列化
    • 4.1 反序列化的基本概念
    • 4.2 Kafka 反序列化器接口
    • 4.3 常用的 Kafka 反序列化器
      • 4.3.1 StringDeserializer
      • 4.3.2 IntegerDeserializer
      • 4.3.3 ByteArrayDeserializer
      • 4.3.4 Avro 反序列化
    • 4.4 自定义反序列化器
  • 五、Kafka 消费者的消费模式
    • 5.1 拉取模式(Pull Model)
      • 5.1 拉取模式的基本流程
      • 5.2 `poll()` 方法的行为
    • 5.3 拉取模式示例
      • 5.4. 高级配置选项
        • 5.4.1 `max.poll.records`
        • 5.4.2 `fetch.min.bytes` 和 `fetch.max.bytes`
      • 5.6 拉取模式的优化
        • 5.6.1 调整 `poll()` 超时
        • 5.6.2 消息批量处理
      • 5.7 总结
  • 六、Kafka 消费者再均衡
    • 再均衡的触发条件
    • 6.1 再均衡过程
    • 6.2 再均衡的触发机制
      • 6.2.1 消费者离开或加入消费者组
      • 6.2.2 分区数量变化
      • 6.2.3 负载均衡
    • 6.3 再均衡过程中可能存在的问题
      • 6.3.1 再均衡延迟
      • 6.3.2 分区再分配的顺序
      • 6.3.3 频繁的再均衡
    • 6.4 再均衡的优化
      • 6.4.1 `session.timeout.ms`
      • 6.4.2 `max.poll.interval.ms`
      • 6.4.3 `rebalance.listener`
    • 6.5 总结
  • 七、Kafka消费者订阅主题和分区
    • 7.1 订阅主题(`subscribe()`)
      • 7.1.1 `subscribe()` 方法
      • 7.1.2 `subscribe()` 配合 `ConsumerRebalanceListener`
      • 7.1.3 `assign()` 与 `subscribe()` 的对比
    • 7.2 订阅分区(`assign()`)
      • 7.2.1 `assign()` 方法
      • 7.2.2 `assign()` 与 `subscribe()` 的区别
    • 7.3 消费者订阅主题与分区的工作流程
      • 7.3.1 基于 `subscribe()` 的工作流程
      • 7.3.2 基于 `assign()` 的工作流程
    • 7.4 示例:订阅主题与分区
      • 7.4.1 基于 `subscribe()` 的示例
      • 7.4.2 基于 `assign()` 的示例
    • 7.5 主题订阅和分区订阅对比
  • 八、Kafka消费者偏移量
    • 8.1 偏移量管理方式
    • 8.2 自动提交偏移量(`enable.auto.commit`)
      • 8.2.1 配置项
      • 8.2.2 自动提交的工作流程
      • 8.2.3 自动提交的优缺点
      • 8.2.4 自动提交示例
    • 8.3 手动提交偏移量(`enable.auto.commit = false`)
      • 8.3.1 配置项
      • 8.3.2 手动提交的工作流程
      • 8.3.3 手动提交的优缺点
      • 8.3.4 手动提交示例
      • 8.3.5 `commitSync()` 与 `commitAsync()` 的对比
    • 8.4 偏移量的存储与恢复
      • 8.4.1. 偏移量的存储
        • 8.4.1.1 偏移量存储的结构
        • 8.4.1.2 **偏移量提交的方式**
      • 8.4.2 偏移量的恢复
        • 8.4.2.1 自动恢复(自动提交偏移量)
        • 8.4.2.2 手动恢复(手动提交偏移量)
        • 8.4.2.3 偏移量的恢复过程
        • 8.4.2.4 处理偏移量恢复的边界情况
        • 8.4.2.5 查看偏移量
      • 8.4.3 偏移量管理的最佳实践
  • 九、Kafka 消费者多线程场景
    • 9.1 为什么使用多线程消费 Kafka 消息
    • 9.2 Kafka 消费者多线程的基本原则
    • 9.3 Kafka 消费者多线程模式
      • 9.3.1 每个线程创建独立消费者
      • 9.3.2 共享消费者实例(消息队列)
    • 9.4 消费者多线程时的注意事项
    • 9.5 每个线程独立消费者和共享消费者实例对比
  • 十、kafka消费者常见问题
    • 10.1 问题:消费者组无法消费消息(消费滞后)
    • 10.2 问题:消费者偏移量重复消费或丢失
    • 10.3 问题:消费者因 `Rebalance`(再均衡)导致的消息丢失或重复消费
    • 10.4 问题:消费者读取不到数据(延迟高或空消费)
    • 10.5 问题:消费者无法连接到 Kafka 集群
    • 10.6 问题:消费者消费消息时延迟过高
  • 十一、Kafka消费者性能调优
    • 11.1 消费者配置参数调优
      • 11.1.1 `fetch.min.bytes` 和 `fetch.max.wait.ms`
      • 11.1.2 `max.poll.records`
      • 11.1.3 `session.timeout.ms` 和 `heartbeat.interval.ms`
      • 11.1.4 `auto.offset.reset`
    • 11.2 消费模式和消息处理逻辑优化
      • 11.2.1 批量处理
      • 11.2.2 异步处理
    • 11.3 资源和硬件优化
      • 11.3.1 内存和 CPU
      • 11.3.2 Kafka 集群优化
    • 11.4 监控和故障排查
      • 11.4.1 消费者监控

一、Kafka 消费者与消费者组

Kafka 中的消费者(Consumer)和消费者组(Consumer Group)是 Kafka 架构中的核心概念。它们对消息的消费模式、扩展性、可靠性以及性能有着直接影响。理解消费者和消费者组的工作原理,可以帮助我们在构建高效和可扩展的消息消费系统时做出合理的设计选择。

1.1 Kafka 消费者(Consumer)概述

Kafka 消费者是一个从 Kafka 主题(Topic)中读取消息的客户端应用程序。消费者可以使用 Kafka 提供的 API 来消费分布式系统中的消息。Kafka 支持不同的消费模型,包括单消费者和消费者组。

消费者的基本操作包括:

  • 订阅主题:消费者可以订阅一个或多个主题。
  • 拉取消息:消费者通过拉取方式(poll() 方法)从 Kafka 代理(Broker)获取消息。
  • 处理消息:消费者接收到消息后,可以对其进行处理,如业务逻辑操作。
  • 提交偏移量:消费者在处理消息后,提交当前消息的偏移量,表示已经处理过该消息。

1.1.1 消费者工作流程

  1. 消费者向 Kafka 代理发送拉取请求。
  2. 代理返回符合消费者订阅条件的消息。
  3. 消费者处理消息并提交偏移量。
  4. 消费者定期发送心跳,以维持其在消费者组中的活跃状态。

1.1.2 消费者的关键配置

  • group.id:指定消费者所属的消费者组。当多个消费者属于同一组时,它们会共享消息的消费。
  • auto.offset.reset:当消费者没有偏移量或偏移量超出范围时,定义从哪里开始消费消息。可以设置为 earliest(从最早的消息开始消费)或 latest(从最新的消息开始消费)。
  • enable.auto.commit:控制是否自动提交消息的偏移量。设置为 false 可以让开发者手动提交偏移量,以控制消息消费的精度。
  • fetch.min.bytes:消费者拉取消息时的最小字节数,确保消息拉取的效率。
  • max.poll.records:每次调用 poll() 方法时,消费者最多拉取的消息数。

示例配置

group.id=my-consumer-group
auto.offset.reset=latest
enable.auto.commit=false
fetch.min.bytes=50000
max.poll.records=1000

1.2 Kafka 消费者组(Consumer Group)概述

Kafka 中的消费者组是多个消费者共同组成的一个逻辑实体。消费者组的作用是将多个消费者组织在一起,共同消费 Kafka 主题的消息。消费者组的核心思想是 消息的分区消费,即每个分区内的消息只能由一个消费者处理。

1.2.1 消费者组的工作原理

  • 分区分配:每个消费者组内的每个消费者负责消费一个或多个主题分区。Kafka 使用消费者组的机制来确保每个分区的消息只有一个消费者进行消费。这样,多个消费者可以并行地消费不同分区的消息,从而提高系统的吞吐量。
  • 再均衡:当消费者组中的消费者发生变化(如加入、离开或失败)时,Kafka 会自动进行再均衡,将分区重新分配给现有消费者。

1.2.2 消费者组的优点

  1. 扩展性:通过增加消费者,可以横向扩展消费能力,支持高吞吐量的消息消费。
  2. 负载均衡:多个消费者之间按分区分配负载,避免了某个消费者过载。
  3. 容错性:如果某个消费者挂掉,Kafka 会触发再均衡,其他消费者会接管该消费者的任务,保证消息消费的高可用性。

1.2.3 消费者组的再均衡

当消费者组成员发生变化时(如消费者加入或退出),Kafka 会自动进行再均衡。在再均衡过程中,分区会被重新分配给消费者,这会导致消费延迟的增加。

再均衡的触发条件

  • 新消费者加入消费者组。
  • 消费者退出消费者组(正常或异常退出)。
  • 分区数量变化(如增加或减少分区)。
  • 负载不均衡,导致重新分配分区。

1.2.4 消费者组的关键配置

  • group.id:指定消费者组的 ID。Kafka 使用消费者组 ID 来标识一个消费者组。
  • partition.assignment.strategy:分区分配策略,Kafka 支持两种策略:RangeRoundRobin
    • Range:根据分区的顺序分配,适用于消费者数和分区数相等的情况。
    • RoundRobin:将分区平均分配给消费者,适用于消费者数少于分区数的情况。

示例配置

group.id=my-consumer-group
partition.assignment.strategy=roundrobin

1.3 消费者与消费者组的关系

1.3.1 单消费者与消费者组

  • 单消费者:当只有一个消费者时,它会消费所有分区的消息。此时,消费者不需要加入消费者组,因为只有一个消费者会消费所有消息。
  • 消费者组:消费者组允许多个消费者共享分区的消费工作。每个消费者组内的每个消费者会处理不同的分区,避免重复消费。

1.3.2 消费者组的偏移量管理

  • 每个消费者组都有独立的 偏移量,Kafka 会为每个消费者组存储偏移量信息。
  • 消费者每次拉取消息后,都会更新自己的消费偏移量。偏移量保存在 Kafka 的内部主题 __consumer_offsets 中。

偏移量存储与恢复

  • 默认情况下,Kafka 自动管理偏移量的存储和恢复。消费者组的偏移量会在每次消费完成后自动提交,或者开发者可以手动提交偏移量。
  • 手动提交偏移量:可以通过 commitSync()commitAsync() 方法来手动提交偏移量。

1.3.3 消费者组的再均衡与负载均衡

  • 当消费者组内的消费者数目发生变化时(如某个消费者失效或新的消费者加入),Kafka 会触发再均衡,将分区重新分配给其他消费者。
  • 通过合理设置消费者组的大小,可以实现负载均衡,确保每个消费者的工作量相对均衡。

1.4 消费者组与分区的关系

  • Kafka 中的每个分区只能被一个消费者组中的一个消费者消费。这意味着如果你有多个消费者,它们将会共享分区的消费工作,避免了重复消费。
  • 如果消费者组内的消费者数量少于主题的分区数,某些消费者将会消费多个分区。反之,如果消费者组内的消费者数量多于分区数,某些消费者将会闲置,不参与消息的消费。

二、kafka消费者客户端开发

Kafka 消费者客户端是用于从 Kafka 集群中的 Topic 消费消息的应用程序。消费者从 Topic 或者指定的分区拉取消息,处理消息后,可以选择提交偏移量,记录它消费到的位置。Kafka 消费者客户端开发通常使用 Kafka 提供的 Java 客户端 API。以下将详细介绍如何开发 Kafka 消费者客户端,包括基本的配置、消费模式、偏移量管理、性能调优等方面。

2.1 Kafka 消费者客户端开发基础

Kafka 消费者客户端需要具备以下功能:

  • 连接 Kafka 集群:配置 Kafka 服务器和消费者组。
  • 订阅 Topic 或 分区:消费者可以订阅一个或多个 Topic,或者指定某个分区进行消费。
  • 拉取消息:使用 poll() 方法从 Kafka 中拉取消息。
  • 消息处理:处理消费到的消息。
  • 提交偏移量:控制消息消费的进度,确保消息的可靠处理。
  • 容错性和负载均衡:处理消费者崩溃和分区再平衡。

Kafka 消费者客户端开发的核心 API:

  • KafkaConsumer:用于连接 Kafka 集群并消费消息。
  • ConsumerConfig:配置消费者的各种属性。
  • Poll():拉取消息的主要方法。
  • commitSync() / commitAsync():提交偏移量的操作方法。

2.2 Kafka 消费者客户端配置

开发消费者时,需要为消费者设置一系列配置参数。主要配置项包括 Kafka 集群的地址、消费者组 ID、反序列化方式、自动提交偏移量策略等。

示例:Kafka 消费者客户端配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 创建消费者配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 集群地址properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 IDproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息 key 反序列化properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息 value 反序列化properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 如果没有偏移量,则从最早的消息开始消费properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅 Topicconsumer.subscribe(List.of("test-topic"));// 拉取和消费消息while (true) {var records = consumer.poll(1000); // 每次拉取 1000 毫秒for (var record : records) {System.out.println("Consumed: " + record.value());}}}
}

常用消费者配置项:

  • bootstrap.servers:指定 Kafka 集群的地址,消费者需要与 Kafka Broker 建立连接。
  • group.id:消费者组的 ID,一个消费者组内的多个消费者会共同消费同一个 Topic。
  • auto.offset.reset:指定消费者从哪里开始消费。当消费者第一次订阅 Topic 或者偏移量不存在时,Kafka 会根据该参数决定从哪里开始消费,可以设置为:
    • earliest:从最早的消息开始消费。
    • latest:从最新的消息开始消费。
  • enable.auto.commit:是否自动提交偏移量。默认值为 true,表示消费者自动提交偏移量。

三、Kafka消费者关键参数

Kafka 消费者的配置参数影响其行为、性能和容错能力。理解这些关键参数对于高效且稳定的消费者操作至关重要。以下是 Kafka 消费者的一些关键配置参数的详细介绍及其作用。

3.1 bootstrap.servers

  • 作用:指定 Kafka 集群的地址列表(主机名和端口),用于初次连接到 Kafka 集群。消费者会通过这些地址发现集群中的所有代理(Broker)。
  • 类型String(逗号分隔的多个 broker 地址)
  • 默认值:无
  • 示例
bootstrap.servers=localhost:9092

3.2 group.id

  • 作用:消费者所属的消费组 ID。Kafka 将消费者分配到消费组中,每个消费组负责消费 Kafka 中的分区。多个消费者共享一个消费组 ID 时,Kafka 会将这些消费者均匀地分配到不同的分区。一个分区只能由同一消费组中的一个消费者处理。
  • 类型String
  • 默认值:无(必须指定)
  • 示例
group.id=test-group

3.3 key.deserializervalue.deserializer

  • 作用:指定如何反序列化消费者接收到的消息的键和值。key.deserializer 负责将消息的键反序列化为指定类型,value.deserializer 则负责将消息的值反序列化为指定类型。Kafka 提供了多个内置的反序列化器,如 StringDeserializerIntegerDeserializerByteArrayDeserializer 等。
  • 类型String(类名)
  • 默认值:无(必须指定)
  • 示例
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.4 enable.auto.commit

  • 作用:控制消费者是否启用自动提交偏移量。当 enable.auto.commit 设置为 true 时,消费者会定期自动提交它的偏移量;如果设置为 false,消费者则需要手动提交偏移量。
  • 类型Boolean
  • 默认值true
  • 示例
enable.auto.commit=false
  • 注意:如果设置为 false,需要手动调用 commitSync()commitAsync() 来提交偏移量。

3.5 auto.commit.interval.ms

  • 作用:如果 enable.auto.commit 设置为 true,该参数指定自动提交偏移量的时间间隔(毫秒)。消费者每隔这个时间间隔自动提交一次偏移量。
  • 类型Long
  • 默认值5000(5秒)
  • 示例
auto.commit.interval.ms=1000

3.6 auto.offset.reset

  • 作用:指定消费者在没有初始偏移量或者偏移量超出范围时,如何处理。
    • earliest:从最早的消息开始消费。
    • latest:从最新的消息开始消费(默认值)。
    • none:如果没有初始偏移量或超出范围,会抛出异常。
  • 类型String
  • 默认值latest
  • 示例
auto.offset.reset=earliest
  • 注意:如果消费者第一次启动并且没有已提交的偏移量,Kafka 会使用这个参数来决定从哪个偏移量开始消费。

3.7 max.poll.records

  • 作用:消费者每次调用 poll() 时,最多可以拉取的消息数量。此参数控制一次 poll() 操作返回的最大消息数。如果消息队列中有更多消息,消费者将会多次调用 poll()
  • 类型Integer
  • 默认值500
  • 示例
max.poll.records=100
  • 注意:减少 max.poll.records 可以减少每次 poll() 拉取的消息数量,从而降低单次处理的压力,适用于处理时间较长的情况。

3.8 session.timeout.ms

  • 作用:消费者与 Kafka 代理之间的会话超时时间。如果在此时间内,消费者没有发送心跳,Kafka 会认为消费者失效,并将其从消费组中移除。消费者需要定期发送心跳来维持连接。
  • 类型Long
  • 默认值10000(10秒)
  • 示例
session.timeout.ms=10000
  • 注意:如果 session.timeout.ms 设置得太短,消费者可能会因为处理消息较慢而被误判为失效,导致频繁的消费者重新均衡。

3.9 heartbeat.interval.ms

  • 作用:消费者发送心跳的时间间隔。如果 heartbeat.interval.ms 设置得过短,可能会导致过多的网络请求;如果设置得过长,可能会导致消费者失效检测不及时。
  • 类型Long
  • 默认值3000(3秒)
  • 示例
heartbeat.interval.ms=3000
  • 注意:应确保 heartbeat.interval.ms 小于 session.timeout.ms,否则消费者可能无法及时响应心跳。

3.10 fetch.min.bytes

  • 作用:指定消费者从服务器拉取消息时,最小返回的数据量。如果 fetch.min.bytes 设置得比较大,消费者会等待,直到 Kafka 服务器返回至少 fetch.min.bytes 字节的数据,避免频繁拉取小的数据包。
  • 类型Long
  • 默认值1
  • 示例
fetch.min.bytes=50000
  • 注意:此参数的配置可以减少网络请求的次数,提高吞吐量,但也可能增加延迟。

3.11 fetch.max.wait.ms

  • 作用:指定消费者拉取数据时的最大等待时间。如果服务器在此时间内没有足够的数据返回,消费者会返回空数据。这通常与 fetch.min.bytes 配合使用。
  • 类型Long
  • 默认值500
  • 示例
fetch.max.wait.ms=1000
  • 注意:在延迟敏感的场景下,设置较低的 fetch.max.wait.ms 有助于减少等待时间。

3.12 client.id

  • 作用:指定客户端的标识符。Kafka 用此 ID 来识别不同的消费者实例。客户端 ID 用于日志记录、监控等操作。
  • 类型String
  • 默认值:无(可以指定)
  • 示例
client.id=consumer-client-1
  • 注意:如果在多个消费者应用中使用相同的 client.id,它们将共享相同的标识符。

3.13 max.poll.interval.ms

  • 作用:指定消费者在两次调用 poll() 之间可以允许的最大间隔时间。若超时,消费者会被认为已死,消费者组会重新分配该消费者负责的分区。
  • 类型Long
  • 默认值300000(5分钟)
  • 示例
max.poll.interval.ms=600000
  • 注意:该参数与 max.poll.records 配合使用,限制了每次消费的时间,防止消费者处理消息过慢。

3.14 partition.assignment.strategy

  • 作用:指定消费者如何分配分区给消费者实例。可选的策略包括:
    • org.apache.kafka.clients.consumer.RangeAssignor:将分区按顺序分配给消费者。
    • org.apache.kafka.clients.consumer.RoundRobinAssignor:轮询方式分配分区给消费者。
  • 类型String
  • 默认值RangeAssignor
  • 示例
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

3.15 isolation.level

  • 作用:指定消费者读取消息时的隔离级别。
    • read_committed:只读取已提交的消息(适用于事务消息)。
    • read_uncommitted:可以读取未提交的消息(默认值)。
  • 类型String
  • 默认值read_uncommitted
  • 示例
isolation.level=read_committed
  • 注意:在启用 Kafka 事务时,使用 read_committed 可以确保消费者只消费提交的消息。
参数作用示例
bootstrap.servers指定 Kafka 集群地址localhost:9092
group.id消费者组 IDtest-group
key.deserializer消息键的反序列化器StringDeserializer
value.deserializer消息值的反序列化器StringDeserializer
enable.auto.commit是否启用自动提交偏移量false
auto.offset.reset无偏移量时的偏移量重置策略earliest
max.poll.records每次拉取的最大记录数100
session.timeout.ms消费者会话超时值10000
heartbeat.interval.ms消费者心跳发送间隔3000
fetch.min.bytes拉取消息时的最小数据量50000
fetch.max.wait.ms最大等待时间1000
client.id消费者客户端 IDconsumer-client-1
max.poll.interval.ms两次 poll() 之间的最大时间间隔600000
partition.assignment.strategy分区分配策略RoundRobinAssignor
isolation.level消费者读取消息的隔离级别read_committed

这些参数控制了消费者的各种行为,适当调整这些参数,可以帮助你根据实际场景优化 Kafka 消费者的性能、可靠性和容错能力。

四、Kafka 反序列化

Kafka 的反序列化是将消息从字节数组(byte[])转回为原始对象的过程。Kafka 消息是以字节数组的形式存储的,因此消费者在接收到消息时,需要将字节数组转化为相应的对象,才能进行进一步处理。Kafka 提供了多种反序列化器,可以根据消息格式选择合适的反序列化器。

4.1 反序列化的基本概念

反序列化是将存储在 Kafka 中的字节数据恢复为原始数据结构的过程。每条消息由两部分组成:keyvalue,两者都需要被反序列化。

Kafka 的反序列化器 (Deserializer) 是负责这个转换的类,它将从 Kafka 中获取的字节数组转换为 Java 对象。

4.2 Kafka 反序列化器接口

Kafka 提供了 org.apache.kafka.common.serialization.Deserializer 接口,该接口定义了反序列化的核心方法:

public interface Deserializer<T> {T deserialize(String topic, byte[] data);
}

deserialize 方法的参数:

  • topic:主题名称,用于标识数据的来源(虽然该参数在反序列化过程中不常用,但有时可以通过它做一些特殊的处理)。
  • data:从 Kafka 中读取的字节数组,需要反序列化为目标对象。

Kafka 提供了几个常用的反序列化器来将字节数组转换为常见数据类型:

  • StringDeserializer:将字节数组反序列化为字符串。
  • IntegerDeserializer:将字节数组反序列化为整数。
  • LongDeserializer:将字节数组反序列化为长整型。
  • ByteArrayDeserializer:将字节数组反序列化为字节数组。
  • Kafka Avro Deserializer:将字节数组反序列化为 Avro 格式的数据。

4.3 常用的 Kafka 反序列化器

4.3.1 StringDeserializer

StringDeserializer 将字节数组转换为字符串。

  • 用法示例
import org.apache.kafka.common.serialization.StringDeserializer;Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test-group");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("Key: " + record.key() + ", Value: " + record.value());}
}

在上述代码中,StringDeserializerkeyvalue 从字节数组反序列化为字符串。

4.3.2 IntegerDeserializer

IntegerDeserializer 将字节数组转换为整数。

  • 用法示例
import org.apache.kafka.common.serialization.IntegerDeserializer;Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test-group");
properties.put("key.deserializer", IntegerDeserializer.class.getName());
properties.put("value.deserializer", IntegerDeserializer.class.getName());KafkaConsumer<Integer, Integer> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<Integer, Integer> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<Integer, Integer> record : records) {System.out.println("Key: " + record.key() + ", Value: " + record.value());}
}

IntegerDeserializerkeyvalue 从字节数组反序列化为整数。

4.3.3 ByteArrayDeserializer

ByteArrayDeserializer 将字节数组转换为字节数组。这个反序列化器通常用于处理原始的字节数据或二进制消息。

  • 用法示例
import org.apache.kafka.common.serialization.ByteArrayDeserializer;Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test-group");
properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
properties.put("value.deserializer", ByteArrayDeserializer.class.getName());KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<byte[], byte[]> record : records) {System.out.println("Key: " + Arrays.toString(record.key()) + ", Value: " + Arrays.toString(record.value()));}
}

ByteArrayDeserializerkeyvalue 作为字节数组处理。

4.3.4 Avro 反序列化

Avro 是一种常见的序列化格式,尤其在使用 Schema Registry 时。使用 Avro 反序列化时,我们通常使用 Kafka Avro Deserializer

首先,需要添加 Kafka Avro 相关的依赖:

<dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.0.1</version>
</dependency>

接着,可以使用 Avro 反序列化器:

  • 用法示例
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test-group");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", KafkaAvroDeserializer.class.getName());
properties.put("schema.registry.url", "http://localhost:8081");KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("avro-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Key: " + record.key() + ", Value: " + record.value());}
}

在这个例子中,消费者将 value 从 Avro 格式反序列化为 GenericRecord 类型的对象,key 仍然是字符串类型。

4.4 自定义反序列化器

在实际应用中,可能需要处理自定义的数据格式。在这种情况下,可以自定义反序列化器。自定义反序列化器需要实现 Deserializer 接口。

自定义反序列化器示例:将字节数组转换为自定义对象

假设我们有一个简单的类 Person,包含 nameage 字段:

public class Person {private String name;private int age;// Getters, setters, and constructor
}

可以编写一个自定义的反序列化器:

import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;public class PersonDeserializer implements Deserializer<Person> {private ObjectMapper objectMapper = new ObjectMapper();@Overridepublic Person deserialize(String topic, byte[] data) {try {return objectMapper.readValue(data, Person.class);} catch (Exception e) {throw new RuntimeException("Error deserializing Person", e);}}
}

使用 PersonDeserializer 的 Kafka 消费者示例:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test-group");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", PersonDeserializer.class.getName());KafkaConsumer<String, Person> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("person-topic"));while (true) {ConsumerRecords<String, Person> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, Person> record : records) {System.out.println("Key: " + record.key() + ", Value: " + record.value().getName());}
}

在这个例子中,我们自定义了一个 PersonDeserializer,将字节数组反序列化为 Person 对象。

五、Kafka 消费者的消费模式

5.1 拉取模式(Pull Model)

Kafka 消费者的拉取模式是消费者通过调用 poll() 方法主动从 Kafka 集群拉取消息。这种模式是 Kafka 消费者的主要工作方式。在拉取模式下,消费者会根据自己的需求定期向 Kafka 请求消息。消费者每次调用 poll() 方法时,Kafka 集群会返回符合条件的消息,或者如果没有新消息,消费者会等待或返回空结果。

拉取模式的关键点:

  • 主动拉取:消费者主动拉取消息,而不是 Kafka 推送消息。
  • 批量拉取:消费者可以批量拉取消息,提高消费效率。
  • 阻塞和超时poll() 方法会阻塞直到有新消息或者超时。

5.1 拉取模式的基本流程

在 Kafka 中,消费者使用拉取模式来获取消息。消费者向 Kafka 请求消息后,会返回一个 ConsumerRecords 对象,其中包含了从指定主题和分区拉取的消息。消费者可以通过 poll() 方法指定等待消息的时间,通常会设置一个最大等待时间。

拉取模式流程概述

  1. 消费者配置:消费者需要设置 bootstrap.serversgroup.id 等基本配置。
  2. 订阅主题:消费者通过 subscribe() 方法订阅一个或多个主题。
  3. 拉取消息:通过 poll() 方法拉取消息,消费者处理这些消息。
  4. 提交偏移量:消息消费完成后,消费者提交已消费的消息的偏移量。

5.2 poll() 方法的行为

poll() 方法是 Kafka 消费者的核心方法,负责从 Kafka 拉取消息。它的基本签名如下:

ConsumerRecords<K, V> poll(Duration timeout)
  • timeout:指定最多等待的时间。如果在这段时间内没有消息,poll() 会返回空的 ConsumerRecords。如果有消息,则会尽可能多地返回消息,直到最大拉取限制为止。
  • 返回值ConsumerRecords 是一个包含多个消息记录的集合,每个记录都有 keyvalueoffset 等信息。

poll() 方法的行为非常重要,它决定了消费者从 Kafka 拉取消息的频率和策略。你可以控制 poll() 的最大等待时间,以适应不同的消费需求。

5.3 拉取模式示例

下面的示例展示了一个典型的 Kafka 消费者使用拉取模式消费消息的流程。

示例:基本的 Kafka 消费者拉取模式

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// 配置消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());// 创建 KafkaConsumer 实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 持续拉取消息while (true) {// 调用 poll 方法,设置最大等待时间为 1000 毫秒var records = consumer.poll(java.time.Duration.ofMillis(1000));// 处理拉取到的消息records.forEach(record -> {System.out.println("Consumed message: Key = " + record.key() + ", Value = " + record.value());});}}
}

说明:

  • subscribe():消费者订阅 test-topic 主题。
  • poll():消费者每次调用 poll() 方法,最多等待 1000 毫秒。如果有新消息,poll() 会返回消息。如果没有消息,poll() 会返回一个空的 ConsumerRecords 对象。
  • forEach:遍历并处理每一条拉取到的消息。

5.4. 高级配置选项

5.4.1 max.poll.records
  • 作用max.poll.records 控制每次 poll() 调用返回的最大记录数。默认情况下,Kafka 消费者每次 poll() 调用返回的消息数量是没有限制的。
  • 设置示例
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test-group");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
properties.put("max.poll.records", "10");  // 每次拉取最多 10 条消息KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {System.out.println("Consumed message: " + record.value());});
}

在此示例中,消费者每次拉取的最大消息数为 10 条。

5.4.2 fetch.min.bytesfetch.max.bytes
  • fetch.min.bytes:指定 Kafka 消费者每次拉取时,返回的消息字节数的最小值。如果消息字节数不足,消费者会等待更多消息直到满足该值。

  • fetch.max.bytes:指定 Kafka 消费者每次拉取时,返回的消息字节数的最大值。

  • 设置示例

properties.put("fetch.min.bytes", "1000");  // 拉取至少 1000 字节的数据
properties.put("fetch.max.bytes", "500000");  // 每次最多拉取 500KB 的数据

5.6 拉取模式的优化

5.6.1 调整 poll() 超时
  • poll() 超时poll() 方法的超时时间可以影响消息消费的效率。如果设置太短,可能会导致频繁的网络请求;设置过长,可能会增加消息消费的延迟。
  • 最佳实践:根据消息到达的频率,调整 poll() 的超时,使其在保证低延迟的同时,减少网络请求的频率。
5.6.2 消息批量处理

通过合理设置 max.poll.records,可以在每次拉取时处理更多消息,避免频繁的 poll() 调用,提高消费效率。

5.7 总结

Kafka 消费者的拉取模式是基于主动拉取的方式,通过 poll() 方法从 Kafka 集群拉取消息。消费者可以控制拉取的频率、返回的消息数量以及偏移量的提交方式。合理配置 poll() 方法、消息批量处理以及偏移量管理,有助于提升消息消费的性能和可靠性。

配置项说明默认值
max.poll.records每次 poll() 返回的最大记录数500
fetch.min.bytes每次拉取时,返回的最小字节数1
fetch.max.bytes每次拉取时,返回的最大字节数50MB
enable.auto.commit是否启用自动提交偏移量true
auto.commit.interval.ms自动提交偏移量的间隔时间5000

通过合理的配置和优化,可以提高 Kafka 消费者的消息处理效率,确保高效且可靠的消息消费。

六、Kafka 消费者再均衡

Kafka 中的消费者再均衡(Rebalance)指的是在 Kafka 消费者组中,消费者的分区重新分配的过程。当消费者加入或离开消费者组,或者消费者组中的分区数发生变化时,Kafka 会触发再均衡操作。再均衡过程旨在确保每个消费者能够平衡地消费消息,并且每个分区只会被一个消费者消费。

再均衡的触发条件

  1. 新消费者加入消费者组:当一个新的消费者加入消费者组时,Kafka 会重新分配分区,进行再均衡。
  2. 消费者离开消费者组:当一个消费者从消费者组中离开(无论是正常退出还是异常退出),Kafka 会重新分配分区。
  3. 分区数量变化:当 Kafka 中的某个主题的分区数量发生变化(例如,添加或删除分区),也会触发再均衡。

6.1 再均衡过程

在 Kafka 中,消费者再均衡是由消费者协调器(Consumer Coordinator)来管理的。再均衡的过程包括以下几个步骤:

  1. 消费者启动:消费者启动并加入消费者组。
  2. 分区分配:消费者协调器根据当前消费者组的消费者数量和分区数量,分配每个消费者需要消费的分区。
  3. 消费者消费消息:消费者开始消费分配给它的分区中的消息。
  4. 触发再均衡:在某些事件(如消费者加入或离开)发生时,消费者协调器会触发再均衡。
  5. 重新分配分区:消费者协调器通知每个消费者,让它们重新分配分区。每个消费者停止消费并且重新接收分配的分区。
  6. 恢复消费:消费者在分配到新的分区后开始消费。

再均衡的过程是由 Kafka 自己管理的,但如果不小心配置,可能会导致一些性能问题,比如频繁的再均衡,进而影响消费的稳定性。

6.2 再均衡的触发机制

6.2.1 消费者离开或加入消费者组

当消费者组中的消费者数量变化时,Kafka 会自动触发再均衡。例如:

  • 消费者加入:当新的消费者加入消费者组时,Kafka 会重新分配分区。
  • 消费者离开:当某个消费者离开消费者组时,Kafka 会重新分配该消费者原来消费的分区。

6.2.2 分区数量变化

当 Kafka 集群中的某个主题的分区数发生变化时,Kafka 也会触发再均衡。

6.2.3 负载均衡

Kafka 会尽量使得每个消费者分配到相同数量的分区,但当分区数和消费者数不完全匹配时,某些消费者可能会被分配更多的分区。

6.3 再均衡过程中可能存在的问题

6.3.1 再均衡延迟

每次再均衡过程中,消费者会停止消费一段时间,直到新的分区分配完成,这可能会导致一些延迟。这个过程会影响消费的连续性。

6.3.2 分区再分配的顺序

再均衡时,如果消费者在某个分区上已经消费了一部分消息(例如,提交了偏移量),那么分配给新消费者的分区可能会导致它从这个分区的某个位置开始消费,从而可能导致消息的重复消费或漏消费。

6.3.3 频繁的再均衡

频繁的消费者加入和离开,或者分区数量频繁变化,可能导致 Kafka 消费者组的再均衡操作频繁发生,进而影响消息消费的稳定性和性能。

6.4 再均衡的优化

为了减少再均衡的频率和延迟,Kafka 提供了一些优化选项。

6.4.1 session.timeout.ms

session.timeout.ms 设置了消费者与 Kafka 消费者协调器之间的心跳超时时间。如果消费者在这个时间内没有发送心跳,Kafka 会认为该消费者已经失效,并触发再均衡。

  • 默认值是 10 秒。
  • 减小该值会更快地检测消费者失效,但可能增加由于网络延迟等原因触发误判的风险。

6.4.2 max.poll.interval.ms

max.poll.interval.ms 控制消费者处理消息的最大时间。如果消费者在此时间内没有调用 poll() 方法,Kafka 会认为消费者失效,并触发再均衡。

  • 默认值是 5 分钟。
  • 如果消费者处理每条消息的时间过长,可能会触发再均衡。

6.4.3 rebalance.listener

Kafka 允许你通过实现 ConsumerRebalanceListener 接口来控制消费者再均衡的行为。例如,你可以在再均衡前后执行一些操作,如提交偏移量、清理缓存等。

  • onPartitionsRevoked():在分区重新分配之前调用,可以在此进行一些清理工作。
  • onPartitionsAssigned():在分区分配之后调用,可以在此进行初始化工作。

示例:使用 ConsumerRebalanceListener 控制再均衡

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;import java.util.Collections;
import java.util.Properties;public class ConsumerWithRebalanceListener {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("enable.auto.commit", "false");  // 手动提交偏移量KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {// 分区被撤销时调用@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("Partitions revoked: " + partitions);// 提交偏移量,防止消息丢失consumer.commitSync();}// 分区被分配时调用@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.out.println("Partitions assigned: " + partitions);// 可以在此初始化处理逻辑}});while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {System.out.println("Consumed message: " + record.value());});consumer.commitSync();}}
}

说明:

  • onPartitionsRevoked():消费者在分区被撤销前调用,可以在这里提交偏移量,确保数据不会丢失。
  • onPartitionsAssigned():消费者在分区被重新分配后调用,在此可以初始化处理逻辑。

6.5 总结

Kafka 消费者的再均衡是为了确保消费者组中的每个消费者都能公平地消费分区。在消费者加入、离开或分区数量变化时,Kafka 会触发再均衡,重新分配分区。尽管再均衡是 Kafka 消费者组的正常行为,但过于频繁的再均衡可能影响消费性能。

配置项说明默认值
session.timeout.ms消费者与 Kafka 的心跳超时时间10,000 ms
max.poll.interval.ms消费者最大处理时间300,000 ms
enable.auto.commit是否自动提交偏移量true
rebalance.listener自定义再均衡监听器

通过合理配置和优化消费者的再均衡策略,能够减少不必要的延迟和资源浪费,提高消息消费的稳定性和性能。

七、Kafka消费者订阅主题和分区

在 Kafka 中,消费者通过订阅主题来获取消息。消费者订阅主题时,Kafka 会将该主题的一个或多个分区分配给消费者。Kafka 提供了两种主要的订阅方式:基于主题的订阅基于分区的订阅

7.1 订阅主题(subscribe()

消费者通过调用 subscribe() 方法来订阅一个或多个主题。当消费者订阅了一个主题时,Kafka 会自动将该主题的所有分区分配给消费者。消费者不需要指定具体的分区,Kafka 会在消费者组中均衡分配分区。

7.1.1 subscribe() 方法

subscribe() 方法用于订阅一个或多个主题。消费者根据该方法订阅的主题,会自动分配相应的分区。

consumer.subscribe(Arrays.asList("topic1", "topic2"));
  • 多个主题订阅:传入一个主题列表,消费者会订阅多个主题。
  • 自动分区分配:Kafka 会根据消费者组的大小和每个主题的分区数自动为消费者分配分区。

7.1.2 subscribe() 配合 ConsumerRebalanceListener

可以在订阅时提供一个 ConsumerRebalanceListener 监听器,用于处理再均衡事件,如分区的分配和撤销。

consumer.subscribe(Arrays.asList("topic1"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("Partitions revoked: " + partitions);}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.out.println("Partitions assigned: " + partitions);}
});
  • onPartitionsRevoked():在分区撤销之前调用,通常用于提交偏移量,避免数据丢失。
  • onPartitionsAssigned():在分区分配之后调用,消费者可以在此初始化相关的资源。

7.1.3 assign()subscribe() 的对比

  • subscribe():消费者订阅主题,Kafka 自动将分区分配给消费者。适用于大多数常见场景。
  • assign():消费者直接指定分区进行消费,不需要 Kafka 进行自动分配。适用于某些特殊的消费需求,比如需要手动控制分配的情况。

7.2 订阅分区(assign()

有时,消费者可能需要手动指定具体的分区进行消费,而不是让 Kafka 自动分配。这可以通过 assign() 方法实现。与 subscribe() 方法不同,assign() 方法直接指定分区,不会触发消费者组的再均衡操作。

7.2.1 assign() 方法

消费者通过 assign() 方法指定一个或多个分区进行消费。此时,消费者会直接从指定的分区拉取消息。

List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic2", 1)
);consumer.assign(partitions);
  • 手动指定分区:消费者指定了特定的分区,不会由 Kafka 进行自动分配。
  • 没有再均衡:使用 assign() 时,消费者不会参与消费者组的再均衡,消费者手动管理分区。

7.2.2 assign()subscribe() 的区别

  • subscribe():Kafka 会自动根据消费者组的大小、分区数等条件分配分区,并且会触发再均衡。
  • assign():消费者手动指定具体的分区,Kafka 不会进行分配,并且不会触发再均衡。

assign() 适用于某些特殊场景,例如,消费者需要处理特定分区的消息或在某些情况下避免分区的动态调整。

7.3 消费者订阅主题与分区的工作流程

7.3.1 基于 subscribe() 的工作流程

  1. 消费者订阅主题:消费者调用 subscribe() 方法,指定需要消费的主题。
  2. 分区分配:Kafka 会为每个主题的每个分区分配一个消费者。消费者可以消费多个主题的多个分区。
  3. 处理消息:消费者开始从分配给它的分区中拉取消息进行处理。
  4. 分区再分配(再均衡):当消费者加入、离开或分区数发生变化时,Kafka 会触发再均衡,重新分配分区。

7.3.2 基于 assign() 的工作流程

  1. 消费者手动指定分区:消费者通过 assign() 方法明确指定要消费的分区。
  2. 消息消费:消费者从指定的分区开始消费消息。消费者不参与消费者组的再均衡,因此需要手动管理每个分区的消费。
  3. 无需再均衡:没有消费者加入或离开消费者组,也不会触发再均衡。

7.4 示例:订阅主题与分区

7.4.1 基于 subscribe() 的示例

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.Properties;public class SubscribeExample {public static void main(String[] args) {// 配置消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());// 创建 KafkaConsumer 实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅多个主题consumer.subscribe(Arrays.asList("topic1", "topic2"));// 拉取消息while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {System.out.println("Consumed message: " + record.value());});}}
}
  • subscribe(Arrays.asList("topic1", "topic2")):消费者订阅了 topic1topic2 两个主题,Kafka 会自动为这两个主题分配分区。
  • 消费者从分配给它的分区拉取消息并进行消费。

7.4.2 基于 assign() 的示例

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.List;
import java.util.Properties;public class AssignExample {public static void main(String[] args) {// 配置消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());// 创建 KafkaConsumer 实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 手动指定消费的分区List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic1", 0),new TopicPartition("topic2", 1));consumer.assign(partitions);  // 手动指定分区// 拉取消息while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {System.out.println("Consumed message: " + record.value());});}}
}
  • assign(partitions):消费者明确指定了要消费的 topic1 分区 0 和 topic2 分区 1。
  • 该消费者将只从指定的分区拉取消息,而不会自动获取其他分区的消息。

7.5 主题订阅和分区订阅对比

订阅方式说明优势使用场景
subscribe()通过主题订阅,Kafka 自动分配分区自动分配分区,适应性强一般情况下的主题消费
assign()直接指定分区,消费者手动管理分配精细控制分区分配特殊场景,如需要手动控制分区消费
  • subscribe() 适用于大多数常见的消费场景,Kafka 会自动管理分区分配。
  • assign() 适用于特殊场景,如需要精准控制消费者消费哪些分区。

八、Kafka消费者偏移量

在 Kafka 中,偏移量(Offset)是指消费者在某个分区中消费消息的位置。每条消息在 Kafka 中都有一个唯一的偏移量,消费者使用偏移量来记录它已经消费到哪个位置。当消费者再次启动时,可以从上次提交的偏移量继续消费。

偏移量对于 Kafka 消费者来说至关重要,它允许消费者在消息流中保持同步或恢复消费。Kafka 默认会将每个分区的偏移量存储在 Kafka 集群中的一个特殊的内部主题(__consumer_offsets)中。

8.1 偏移量管理方式

Kafka 提供了两种主要的偏移量管理方式:

  1. 自动提交偏移量(默认方式)
  2. 手动提交偏移量

消费者可以选择适合自己需求的偏移量管理方式,确保消息处理的可靠性和可追溯性。

8.2 自动提交偏移量(enable.auto.commit

默认情况下,Kafka 消费者会自动提交偏移量,即每消费一条消息,消费者会自动提交当前偏移量到 Kafka 集群。

8.2.1 配置项

  • enable.auto.commit:是否启用自动提交偏移量。默认值为 true
  • auto.commit.interval.ms:自动提交偏移量的时间间隔。默认值为 5000ms

8.2.2 自动提交的工作流程

  1. 消费者拉取消息并处理。
  2. 每消费完一条消息,消费者会在后台自动提交偏移量,表示已处理的最新消息位置。
  3. 如果消费者发生故障或重新启动,它将从上次提交的偏移量继续消费。

8.2.3 自动提交的优缺点

  • 优点

    • 简单易用,不需要额外的配置。
    • 对于不要求高精度消费控制的场景,适用。
  • 缺点

    • 消息丢失:在消费者处理消息时发生故障,可能导致消息未被成功处理却已经提交偏移量,造成丢失。
    • 消息重复消费:如果在提交偏移量之前消费者崩溃,消息会被重新消费,导致重复消费。

8.2.4 自动提交示例

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.Properties;public class AutoCommitExample {public static void main(String[] args) {// 配置消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("enable.auto.commit", "true");  // 启用自动提交properties.put("auto.commit.interval.ms", "1000");  // 提交间隔// 创建 KafkaConsumer 实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Arrays.asList("topic1"));// 拉取并处理消息while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {System.out.println("Consumed message: " + record.value());});}}
}

8.3 手动提交偏移量(enable.auto.commit = false

手动提交偏移量意味着消费者在处理完一条或多条消息后,明确调用 commitSync()commitAsync() 方法来提交偏移量。

8.3.1 配置项

  • enable.auto.commit:设置为 false,禁用自动提交偏移量。
  • auto.commit.interval.ms:此配置无效,当 enable.auto.commitfalse 时,消费者需要手动控制提交偏移量。

8.3.2 手动提交的工作流程

  1. 消费者拉取消息并处理。
  2. 消费者在处理完消息后调用 commitSync()commitAsync() 提交当前偏移量。
  3. commitSync() 是同步提交,直到提交成功才会返回,保证偏移量提交成功后才继续消费。
  4. commitAsync() 是异步提交,消费者不会等待提交结果,适用于性能要求较高的场景,但需要处理提交失败的异常。

8.3.3 手动提交的优缺点

  • 优点

    • 精确控制消费进度,避免消息丢失或重复消费。
    • 可以确保每条消息都被处理后才提交偏移量,增加了可靠性。
  • 缺点

    • 开发者需要管理偏移量提交的时机,增加了复杂性。

8.3.4 手动提交示例

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.Properties;public class ManualCommitExample {public static void main(String[] args) {// 配置消费者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("enable.auto.commit", "false");  // 禁用自动提交// 创建 KafkaConsumer 实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Arrays.asList("topic1"));// 拉取并处理消息while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {System.out.println("Consumed message: " + record.value());});// 手动提交偏移量consumer.commitSync();  // 同步提交// consumer.commitAsync(); // 异步提交}}
}
  • commitSync():同步提交,直到偏移量成功提交后才会返回。
  • commitAsync():异步提交,提交结果由回调处理,不会阻塞消费。

8.3.5 commitSync()commitAsync() 的对比

方法特点优势缺点
commitSync()同步提交偏移量提交后确认成功,可以保证准确性可能导致消费阻塞
commitAsync()异步提交偏移量不会阻塞消费,性能更高提交失败时需要回调处理

8.4 偏移量的存储与恢复

在 Kafka 中,消费者的偏移量(Offset)记录了消费者在某个分区上消费的最后一条消息的位置。Kafka 默认将消费者的偏移量存储在一个特殊的内部主题 __consumer_offsets 中。每当消费者提交偏移量时,Kafka 会将该偏移量写入该主题,确保消费者在重新启动时能够恢复消费进度。

偏移量的存储和恢复机制是 Kafka 消费者的重要特性之一,能够保证消费者的高可用性和数据的精确消费。

8.4.1. 偏移量的存储

Kafka 将每个消费者组(由 group.id 标识)在每个分区的偏移量存储在名为 __consumer_offsets 的内部 Kafka 主题中。每个分区对应一个记录,存储了该分区的最新偏移量、消费者组的元数据、偏移量提交的时间等信息。

  • 每个消费者组在 Kafka 中都有唯一的标识 group.id,Kafka 会为每个消费者组分配一个分区。
  • 每个消费者组的偏移量按分区存储,允许多个消费者组独立地消费相同的消息。
8.4.1.1 偏移量存储的结构

__consumer_offsets 主题的结构大致如下:

  • group:消费者组的 ID(即 group.id)。
  • topic:主题名称。
  • partition:分区号。
  • offset:消费者在该分区中的偏移量(即消费的最后一条消息的位置信息)。
  • metadata:提交偏移量的元数据,通常为空。
  • timestamp:偏移量提交的时间戳。

Kafka 会定期将消费者的偏移量写入到该主题。

8.4.1.2 偏移量提交的方式
  • 自动提交偏移量(默认方式):消费者自动提交偏移量,提交的频率和时间间隔由 enable.auto.commitauto.commit.interval.ms 配置项控制。
  • 手动提交偏移量:消费者显式调用 commitSync()commitAsync() 方法提交偏移量。手动提交偏移量通常在消息处理成功后进行,确保偏移量的提交与消息消费的成功状态保持一致。

8.4.2 偏移量的恢复

当消费者重启或在某些情况下发生故障时,它需要恢复消费进度,这时就需要使用之前存储在 __consumer_offsets 主题中的偏移量。

8.4.2.1 自动恢复(自动提交偏移量)

如果消费者启用了自动提交(enable.auto.commit = true),Kafka 会在消费者重新启动时自动使用最后一次提交的偏移量恢复消费进度。Kafka 会自动检查 __consumer_offsets 主题并将消费者恢复到上次提交的偏移量位置。

8.4.2.2 手动恢复(手动提交偏移量)

如果消费者使用手动提交偏移量(enable.auto.commit = false),则消费者可以在每次消费完成后调用 commitSync()commitAsync() 提交偏移量。消费者重新启动时,它会读取 __consumer_offsets 主题,恢复到上次成功提交的偏移量。

  • commitSync():同步提交偏移量,消费者会等待 Kafka 完成偏移量的提交。如果提交失败,消费者会抛出异常并需要处理。
  • commitAsync():异步提交偏移量,消费者不会等待提交结果,提交失败时会通过回调函数处理。

消费者每次拉取消息时,都会检查 __consumer_offsets 中记录的偏移量,从而恢复消费进度。

8.4.2.3 偏移量的恢复过程
  1. 重启消费者:消费者进程停止并重新启动。
  2. 读取偏移量:消费者通过其 group.id 和每个分区号从 __consumer_offsets 主题读取偏移量。
  3. 恢复消费进度:消费者根据读取的偏移量恢复消费。具体的偏移量取决于消费者最后提交的状态。
  4. 继续消费:消费者从恢复的偏移量开始继续拉取消息。
8.4.2.4 处理偏移量恢复的边界情况

在恢复消费进度时,可能会遇到以下几种情况:

  • 偏移量已过期:如果偏移量过期(即超过 Kafka 集群的 log.retention.ms 配置的时间限制),消费者会发现该偏移量对应的消息已经被删除。此时,消费者通常会回退到最新的有效偏移量或使用 auto.offset.reset 配置项定义的行为(如从最早或最新消息开始消费)。

  • auto.offset.reset 配置项

    • latest:如果没有找到偏移量或偏移量无效,消费者将从最新的消息开始消费。
    • earliest:如果没有找到偏移量或偏移量无效,消费者将从最早的消息开始消费。
    • none:如果没有找到偏移量,消费者会抛出异常。
8.4.2.5 查看偏移量

Kafka 提供了一个命令行工具 kafka-consumer-groups.sh,可以查看消费者组的偏移量信息。这对于调试和监控非常有用。

查看消费者组的偏移量:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

该命令会显示指定消费者组(test-group)的每个分区的当前偏移量、已提交的偏移量以及 lag(延迟)信息。

示例输出:

Group           Topic              Partition  Current Offset  Log End Offset  Lag
test-group      topic1             0          15              20              5
test-group      topic1             1          5               20              15
  • Current Offset:消费者当前的偏移量。
  • Log End Offset:该分区的最新消息的偏移量。
  • Lag:消费者的延迟,表示消费者距离最新消息还有多少条消息。

8.4.3 偏移量管理的最佳实践

  1. 提交偏移量时机

    • 对于关键数据,推荐在消费完数据并确认处理无误后再提交偏移量。
    • 如果使用手动提交,通常会在消息处理成功之后调用 commitSync()commitAsync() 来提交偏移量。
    • 如果使用自动提交,考虑调整 auto.commit.interval.ms 的值来控制提交频率。
  2. 偏移量的管理方式

    • 对于高可靠性场景,使用手动提交偏移量,确保只有消息处理成功后才提交偏移量。
    • 对于简单场景,自动提交偏移量可以简化开发,但可能会导致消息丢失或重复消费。
  3. 偏移量的重置

    • 如果遇到无法恢复的错误或偏移量丢失,可以使用 auto.offset.reset 配置项来控制偏移量的恢复方式。
    • 在开发中,可以在调试时通过命令行工具查看消费者的偏移量状态,确保正确恢复消费进度。

九、Kafka 消费者多线程场景

在 Kafka 中,消费者通常是单线程工作的,一个消费者实例只能处理一个线程的消息消费任务。然而,在某些场景下,可能需要使用多个线程来并行处理消息以提高消费效率。这时需要特别注意消息的顺序性、线程的管理以及偏移量的管理等问题。

9.1 为什么使用多线程消费 Kafka 消息

  • 提高性能:当 Kafka 集群中有大量消息时,单线程消费可能会成为瓶颈。使用多线程可以提高处理速度。
  • 并行处理:某些处理逻辑可能非常复杂,多个线程可以同时处理消息,缩短总体处理时间。
  • 分区级别并行:Kafka 本身是基于分区的,消费者可以并行消费不同分区的数据,因此可以利用多线程消费不同分区的消息。

9.2 Kafka 消费者多线程的基本原则

  1. 每个线程消费独立分区:Kafka 的分区是并行消费的基本单位。一个线程消费一个或多个分区的消息,多个线程消费不同的分区。消费者不应该跨多个线程共享分区的消费。

  2. 消费者组(Consumer Group):每个消费者组中的消费者负责消费不同分区的消息。同一个消费者组内的不同消费者可以分配到不同的分区,因此,采用多线程时,可以在不同线程中创建多个消费者实例,来实现对不同分区的并行消费。

  3. 消息顺序性:Kafka 确保同一分区内的消息是有顺序的,但不同分区之间的消息顺序是不可保证的。在多线程消费时,要注意保证每个分区内消息的顺序性。

  4. 偏移量管理:每个消费者都会跟踪自己消费的偏移量。使用多线程时,通常会为每个线程创建独立的消费者实例,确保每个线程的偏移量管理是独立的。

9.3 Kafka 消费者多线程模式

有两种常见的多线程模式:

  1. 每个线程创建独立消费者:为每个线程创建独立的消费者实例,每个消费者消费一个或多个分区。
  2. 共享消费者实例:一个线程创建多个消费者实例,共享消息队列,适用于需要控制消费顺序的场景。

9.3.1 每个线程创建独立消费者

这种模式下,每个线程创建一个 Kafka 消费者实例,并独立消费某些分区的消息。此时每个消费者负责消费分配给它的分区,确保了每个线程能够并行处理消息。

优点

  • 简单高效,能够利用 Kafka 分区进行并行消费。
  • 每个线程负责独立的消费任务,彼此之间不干扰。

缺点

  • 消费者实例的创建和销毁需要管理,可能增加复杂度。

示例代码:每个线程独立消费者

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;public class MultiThreadConsumer {public static void main(String[] args) {// 创建多个消费者线程int numThreads = 3;  // 假设有 3 个线程for (int i = 0; i < numThreads; i++) {new Thread(new ConsumerRunnable(i)).start();}}public static class ConsumerRunnable implements Runnable {private final int threadId;public ConsumerRunnable(int threadId) {this.threadId = threadId;}@Overridepublic void run() {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("enable.auto.commit", "false"); // 禁用自动提交KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Arrays.asList("topic1"));while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {System.out.println("Thread " + threadId + " consumed message: " + record.value());});// 手动提交偏移量consumer.commitSync();}}}
}
  • 这里使用了 3 个线程,每个线程独立创建一个消费者实例,消费相同的主题 topic1
  • 每个线程的消费都是独立的,Kafka 会根据消费者组的负载均衡策略自动分配分区给各个消费者。

9.3.2 共享消费者实例(消息队列)

在共享消费者实例模式下,可以将一个消费者实例的消息拉取到一个共享队列中,然后多个线程从这个队列中获取消息进行并行处理。这种方式需要精确控制消息的分配和消费顺序。

优点

  • 控制消费的顺序性。
  • 可以共享一个消费者实例,减少 Kafka 消费者实例的数量。

缺点

  • 线程之间需要协调,可能带来额外的复杂性。
  • 消费者实例的负载均衡不如独立消费者那样灵活。

示例代码:共享消费者实例

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class SharedConsumerQueue {public static void main(String[] args) {BlockingQueue<String> queue = new LinkedBlockingQueue<>();KafkaConsumer<String, String> consumer = createConsumer();// 创建线程池,每个线程从共享队列中获取消息进行处理for (int i = 0; i < 3; i++) {new Thread(new ConsumerWorker(queue, i)).start();}// 消费者拉取消息并放入共享队列new Thread(() -> {while (true) {var records = consumer.poll(java.time.Duration.ofMillis(1000));records.forEach(record -> {try {queue.put(record.value());} catch (InterruptedException e) {e.printStackTrace();}});consumer.commitSync();}}).start();}public static class ConsumerWorker implements Runnable {private final BlockingQueue<String> queue;private final int threadId;public ConsumerWorker(BlockingQueue<String> queue, int threadId) {this.queue = queue;this.threadId = threadId;}@Overridepublic void run() {try {while (true) {String message = queue.take();System.out.println("Thread " + threadId + " processing message: " + message);// 在这里处理消息}} catch (InterruptedException e) {e.printStackTrace();}}}private static KafkaConsumer<String, String> createConsumer() {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Arrays.asList("topic1"));return consumer;}
}
  • 这里创建了一个共享的 BlockingQueue,消费者将拉取到的消息放入队列,多个线程从队列中取消息进行并行处理。
  • 每个线程从队列中取出消息并执行处理操作,可以根据需要进行异步或同步处理。

9.4 消费者多线程时的注意事项

  1. 偏移量管理

    • 独立消费者实例:每个线程应该有独立的消费者实例,这样偏移量管理就不会出现冲突。
    • 共享消费者实例:如果多个线程共享一个消费者实例,需要特别注意线程安全和偏移量提交的同步问题。
  2. 负载均衡与分区分配

    • Kafka 会根据消费者组中的消费者数量自动分配分区给消费者。若使用多个线程,可以通过启动多个消费者来并行消费不同的分区。
    • 如果在一个线程内共享消费者实例进行消费,那么可以利用消费者的分区分配策略来确保每个分区都被消费。
  3. 消息顺序性

    • Kafka 保证同一分区内消息的顺序性,但不同分区之间的消息顺序不保证。在多线程消费时,多个线程处理的分区之间的消息顺序可能会打乱,必须在设计中考虑这一点。
  4. 线程池和异常处理

    • 使用线程池来管理线程,并为每个线程添加异常处理机制,以确保线程的稳定运行。
    • 线程池可以避免频繁创建和销毁线程,提高性能和资源利用率。

9.5 每个线程独立消费者和共享消费者实例对比

多线程方式优点缺点适用场景
每个线程独立消费者简单高效,利用 Kafka 分区进行并行消费需要管理多个消费者实例,代码较为复杂高并发消费,分区级别的并行处理
共享消费者实例减少消费者实例数量,适用于有顺序需求的场景线程间需要协调,可能带来额外复杂性消费顺序要求较高,资源有限的场景

在 Kafka 消费者多线程设计时,选择适合的策略可以有效提高消费效率,但需要特别注意消费者实例的管理、偏移量提交和消息顺序性等问题。

十、kafka消费者常见问题

Kafka 消费者在实际使用中可能会遇到多种问题。这些问题通常与消费者的配置、偏移量管理、性能优化等方面相关。以下是一些常见的 Kafka 消费者问题及其详细介绍,包括具体的解决方案和案例。

10.1 问题:消费者组无法消费消息(消费滞后)

现象:消费者组无法消费新的消息,或者消费速度远远低于生产者的消息速率。

可能原因

  • 消费者与生产者速率不匹配:如果生产者的消息速率高于消费者的消费速率,消费者会出现消费滞后的问题。
  • 分区分配不均:消费者组的消费者数目不足,或者消费者组中的某些消费者没有被分配到分区,导致部分分区无法消费。
  • 消费处理能力不足:消费者端的处理能力(例如消息处理时间过长)会导致消费速度降低。

解决方案

  1. 增加消费者数量:可以增加消费者的数量,确保每个分区都能有一个消费者进行消费,达到负载均衡。
  2. 调整消费者配置:如增加 fetch.min.bytes 和减少 fetch.max.wait.ms 以减少网络请求次数,提升吞吐量。
  3. 检查消费者处理能力:优化消费者端的消息处理逻辑,减少每条消息的处理时间。

示例
如果消息的处理时间过长,可以采用多线程或异步处理来提高消费效率。例如,在每个消费者线程中异步处理消息:

public class ConsumerWorker implements Runnable {private KafkaConsumer<String, String> consumer;public ConsumerWorker(KafkaConsumer<String, String> consumer) {this.consumer = consumer;}@Overridepublic void run() {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);records.forEach(record -> {// 异步处理消息CompletableFuture.runAsync(() -> processMessage(record));});}}private void processMessage(ConsumerRecord<String, String> record) {// 处理消息的逻辑}
}

10.2 问题:消费者偏移量重复消费或丢失

现象:消费者在消费消息时可能会遇到偏移量重复消费或丢失的问题,导致消息的重复消费或者丢失。

可能原因

  • 自动提交偏移量失效:如果消费者未正确配置偏移量自动提交或者手动提交时存在问题,可能导致消息的重复消费或丢失。
  • 消费者崩溃或重启:如果消费者崩溃或重启时未成功提交偏移量,可能会重复消费未提交的消息。
  • 消费者组切换:当消费者组发生变化(例如消费者加入或离开),Kafka 会重新分配分区并重新处理偏移量。

解决方案

  1. 关闭自动提交并手动提交偏移量:通过 enable.auto.commit=false 配置关闭自动提交,并使用 commitSync()commitAsync() 手动提交偏移量,确保只有在消息处理完毕后才提交偏移量。
  2. 使用精确的偏移量管理:如果需要精确控制消息的消费进度,可以采用基于事务的消息处理方案。

示例:手动提交偏移量示例:

consumer.poll(100).forEach(record -> {// 处理消息consumer.commitSync(); // 提交当前偏移量
});

10.3 问题:消费者因 Rebalance(再均衡)导致的消息丢失或重复消费

现象:当消费者组的成员变动时(例如新消费者加入或旧消费者离开),Kafka 会触发消费者组的 再均衡(Rebalancing)。在此期间,消费者可能会丢失正在消费的消息,或者会重复消费已经消费过的消息。

可能原因

  • 消费者在再均衡期间提交偏移量:消费者在执行消息处理时,如果触发了再均衡,可能会导致未处理完的消息丢失。
  • 再均衡过程中的阻塞:再均衡时,如果消费者未能及时完成偏移量提交或消息处理,可能会导致在恢复后从错误的偏移量开始消费。

解决方案

  1. 降低再均衡触发的频率:适当增加 session.timeout.msheartbeat.interval.ms 的时间,减少消费者失效和再均衡的频率。
  2. 使用精确的消息处理:在处理消息时,使用手动提交偏移量,确保每条消息的处理完成后才提交偏移量。
  3. 使用 Kafka 事务:为消费者启用事务,确保消费者可以在事务失败时回滚。

示例
避免在 poll()commit() 之间做复杂的逻辑处理:

consumer.poll(100).forEach(record -> {try {// 处理消息consumer.commitSync(); // 提交偏移量} catch (Exception e) {// 处理异常,确保偏移量不丢失}
});

10.4 问题:消费者读取不到数据(延迟高或空消费)

现象:消费者调用 poll() 时返回为空,或者消息的消费延迟较高。

可能原因

  • auto.offset.reset 配置错误:如果 auto.offset.reset 设置为 latest,且消费者之前没有偏移量记录,那么消费者将从最新的消息开始消费,导致丢失旧消息。
  • 消息未被生产:如果生产者发送的消息较少或没有消息到达,消费者可能会在短时间内读取不到数据。
  • 消费进度问题:如果消费者的偏移量已经超过了当前的消息,可能导致消费者读取不到消息。

解决方案

  1. 检查 auto.offset.reset 配置:确保 auto.offset.reset 设置为 earliest,以便消费者能够消费所有消息,包括未消费的旧消息。
  2. 检查生产者消息流:确保生产者持续发送消息,避免因生产者停止发送而导致消费者读取不到消息。
  3. 调节 fetch.max.wait.msfetch.min.bytes 参数:通过调节这些参数来减少拉取消息时的延迟。

示例

auto.offset.reset=earliest

10.5 问题:消费者无法连接到 Kafka 集群

现象:消费者启动时,无法连接到 Kafka 集群,抛出连接异常。

可能原因

  • bootstrap.servers 配置错误:指定的 Kafka 集群地址或端口错误。
  • 网络问题:消费者所在的机器与 Kafka 代理之间存在网络连接问题。
  • Kafka 代理不可用:Kafka 集群的某些代理(Broker)不可用,导致消费者无法建立连接。

解决方案

  1. 检查 bootstrap.servers 配置:确保消费者配置了正确的 Kafka 集群地址,格式为 host:port
  2. 检查网络连接:确保消费者的机器与 Kafka 代理之间的网络是通畅的,没有防火墙或端口限制。
  3. 检查 Kafka 代理的状态:确认 Kafka 代理处于正常运行状态,可以使用 Kafka 提供的 kafka-broker-api-versions.sh 等工具检查。

示例

bootstrap.servers=localhost:9092,localhost:9093

10.6 问题:消费者消费消息时延迟过高

现象:消费者的消息处理延迟过高,消息消费的时间大大超过了预期。

可能原因

  • 消费处理过程慢:消费者处理单条消息的时间过长,导致无法及时消费下一条消息。
  • 消费者配置不当:例如,max.poll.records 设置得太大,导致每次拉取的消息数量过多,增加了消息处理的延迟。
  • 资源瓶颈:消费者所在的机器 CPU、内存等资源不足,影响消息消费的速度。

解决方案

  1. 优化消费者的消息处理逻辑:减少每条消息的处理时间,可以通过多线程或异步处理等方式提高效率。
  2. 调整 max.poll.records 参数:减少每次拉取的消息数量,确保每次 poll() 操作的时间不会过长。
  3. 监控消费者的资源消耗:检查消费者所在的机器的资源使用情况,优化消费者的硬件配置。

示例

max.poll.records=10

十一、Kafka消费者性能调优

Kafka 消费者的性能调优是确保高效消费消息、减少延迟、提升吞吐量的关键步骤。通过合理配置消费者的参数,调整处理逻辑和资源配置,可以大大提高 Kafka 消费者的性能。

11.1 消费者配置参数调优

11.1.1 fetch.min.bytesfetch.max.wait.ms

  • fetch.min.bytes:指定消费者拉取数据时的最小字节数。消费者只有在获得至少该数量的数据时才会返回数据。适当增加此值可以减少网络请求的次数,但可能会增加拉取延迟。

  • fetch.max.wait.ms:指定消费者等待拉取数据的最大时间。如果未满足 fetch.min.bytes 条件,消费者将在此时间后返回,即使数据量不足。

调优建议

  • 增大 fetch.min.bytes:通过增大此值,可以减少请求次数,提高吞吐量,但会导致延迟增大。
  • 适当增加 fetch.max.wait.ms:避免频繁的拉取请求,提高网络利用率。

示例

fetch.min.bytes=50000  # 增大每次拉取数据的最小字节数
fetch.max.wait.ms=500  # 设置拉取超时时间

11.1.2 max.poll.records

  • max.poll.records:该参数指定每次调用 poll() 方法时,消费者一次最多拉取的消息数量。增加此值可以一次性拉取更多的消息,减少请求次数,从而提高吞吐量,但同时也会增加每次消费的处理时间。

调优建议

  • 如果消费者处理速度较快,可以增大该值来提高吞吐量。
  • 如果消息处理逻辑复杂或处理时间较长,则应适当减小此值,避免每次拉取的消息太多,导致消费者阻塞。

示例

max.poll.records=1000  # 每次最多拉取 1000 条消息

11.1.3 session.timeout.msheartbeat.interval.ms

  • session.timeout.ms:该参数设置消费者心跳的最大等待时间。如果消费者在此时间内未发送心跳,Kafka 会认为该消费者失效,并启动再均衡(rebalance)过程。过低的 session.timeout.ms 会增加再均衡频率,影响性能。

  • heartbeat.interval.ms:消费者发送心跳的频率。适当调整可以确保消费者的稳定性,并减少不必要的网络负载。

调优建议

  • 增大 session.timeout.ms:减少消费者在重新加入消费者组时的再均衡频率,适合高吞吐量的消费者。
  • 调整 heartbeat.interval.ms:确保心跳发送频率合理,避免不必要的网络开销。

示例

session.timeout.ms=30000  # 增加消费者会话超时,减少再均衡频率
heartbeat.interval.ms=10000  # 设置合理的心跳发送频率

11.1.4 auto.offset.reset

  • auto.offset.reset:当消费者没有偏移量或偏移量超出范围时,该参数控制消费者的行为。auto.offset.reset 有两个选项:
    • earliest:从最早的消息开始消费。
    • latest:从最新的消息开始消费。

调优建议

  • earliest:适用于需要重新消费历史消息的场景,但会增加初次消费时的延迟。
  • latest:适用于仅消费新消息的场景,减少延迟。

示例

auto.offset.reset=earliest  # 从最早的消息开始消费

11.2 消费模式和消息处理逻辑优化

11.2.1 批量处理

批量处理可以大大提高 Kafka 消费者的性能,特别是在处理大量消息时。消费者可以将消息缓存到内存中,进行批量处理,从而减少处理次数和提升吞吐量。

调优建议

  • 增加每次拉取的消息数量:在处理完一批消息后,批量提交偏移量,减少提交次数。
  • 避免每条消息都单独提交偏移量:可以使用批量提交偏移量(例如每处理 100 条消息提交一次),减少提交的频率。

示例

List<ConsumerRecord<String, String>> records = new ArrayList<>();
while (true) {ConsumerRecords<String, String> newRecords = consumer.poll(100);for (ConsumerRecord<String, String> record : newRecords) {records.add(record);}if (records.size() >= BATCH_SIZE) {processBatch(records);  // 批量处理消息consumer.commitSync();  // 提交偏移量records.clear();  // 清空缓存}
}

11.2.2 异步处理

为了提高消费性能,可以将消息处理和偏移量提交操作异步化,使消费者不需要等待每个消息的处理完成,从而提高整体吞吐量。

调优建议

  • 使用线程池或者异步框架来处理消息。
  • 将消息处理和提交操作分离,避免因一个消息的处理阻塞其他消息的消费。

示例

ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {executor.submit(() -> processMessage(record));  // 异步处理消息}
}

11.3 资源和硬件优化

11.3.1 内存和 CPU

对于高吞吐量的 Kafka 消费者,内存和 CPU 的性能非常关键。在大量消息的消费过程中,合理的内存和 CPU 配置可以显著提高消费者的性能。

  • 增加消费者并发性:使用多线程或多个消费者实例来提高 CPU 核心的利用率。
  • 内存优化:确保消费者能够处理足够大的批量消息,避免因内存不足导致的频繁 GC。

调优建议

  • 在多核机器上,可以启动多个消费者线程来并行处理消息。
  • 增加 JVM 堆内存,避免频繁的垃圾回收。

示例

-Xms4g  # 设置 JVM 初始堆内存
-Xmx8g  # 设置最大堆内存

11.3.2 Kafka 集群优化

消费者的性能不仅受客户端配置的影响,还与 Kafka 集群的配置有关。Kafka 集群的吞吐量、延迟和可用性直接影响消费者的性能。

  • 增加分区数量:在 Kafka 主题中增加分区数量,可以让多个消费者并行消费消息,从而提高吞吐量。
  • 负载均衡:确保 Kafka 集群的各个分区能够均匀分布到各个消费者实例中,避免某些消费者过载。

调优建议

  • 适当增加分区数,根据消费者的数量和吞吐量需求来设置主题的分区数。
  • 通过调整生产者的分区策略,使得消息能够均匀地分布到各个分区中。

11.4 监控和故障排查

11.4.1 消费者监控

通过监控 Kafka 消费者的性能,可以及时发现瓶颈,并进行调优。以下是一些关键的监控指标:

  • consumer-lag:消费滞后,表示消费者未处理的消息数量。较大的 consumer-lag 可能意味着消费者处理速度不足,或系统负载过高。
  • records-consumed-rate:每秒消费的记录数,反映消费者的吞吐量。
  • fetch-latency:拉取延迟,表示消费者从 Kafka 代理拉取数据的时间。较高的延迟可能表示消费者配置不当或 Kafka 集群负载过高。

调优建议

  • 监控 consumer-lagfetch-latency 指标,及时调整消费者的配置和处理逻辑。
  • 使用工具如 Prometheus 或 JMX 获取 Kafka 消费者的性能数据。

示例

# 启用 JMX 监控
kafka.consumer.metrics.reporters=org.apache.kafka.common.metrics.JmxReporter

相关文章:

kafka消费者详细介绍(超级详细)

文章目录 一、Kafka 消费者与消费者组1.1 Kafka 消费者&#xff08;Consumer&#xff09;概述1.1.1 消费者工作流程1.1.2 消费者的关键配置 1.2 Kafka 消费者组&#xff08;Consumer Group&#xff09;概述1.2.1 消费者组的工作原理1.2.2 消费者组的优点1.2.3 消费者组的再均衡…...

《剪映5.9官方安装包》免费自动生成字幕

&#xff08;避免失效建议存自己网盘后下载&#xff09;剪映5.9官方Win.Mac 链接&#xff1a;https://pan.xunlei.com/s/VOHc-Fg2XRlD50MueEaOOeW1A1?pwdawtt# 官方唯一的免费版&#xff0c;Win和Mac都有&#xff0c;此版本官方已下架&#xff0c;觉得有用可转存收藏&#xf…...

CAS是什么?ABA会带来什么影响?怎么解决ABA问题?

前言 在高并发开发中&#xff0c;CAS&#xff08;比较并交换&#xff09;是一种常用的无锁操作&#xff0c;因其高效性而被广泛应用。然而&#xff0c;实际工作中常会遇到ABA问题&#xff0c;导致数据更新异常或逻辑错误。理解CAS的原理及ABA问题的解决方法&#xff0c;有助于…...

智能调度体系与自动驾驶技术优化运输配送效率的研究——兼论开源AI智能名片2+1链动模式S2B2C商城小程序的应用潜力

摘要&#xff1a;随着全球化和数字化进程的加速&#xff0c;消费者需求日益呈现出碎片化和个性化的趋势&#xff0c;这对物流运输行业提出了前所未有的挑战。传统的物流调度体系与调度方式已难以满足当前复杂多变的物流需求&#xff0c;因此&#xff0c;物流企业必须积极引入大…...

方豆子(递归)

方豆子 思路&#xff1a;很典的一道递归题&#xff0c;但当时没想到怎么递归/(ㄒoㄒ)/~~。赛后看了大佬的讲解知道要将这个图形看成由四个小正方形组成的大正方形&#xff0c;递归参数可以设置成&#xff08;r1,c1,r2,c2,good)表示正方形的左上角坐标和右下角坐标以及当前这个正…...

Go语言入门指南(二): 数据类型

文章创作不易&#xff0c;麻烦大家点赞关注转发一键三连。 在上一篇文章&#xff0c;我们已经完成了开发环境的搭建&#xff0c;成功创建了第一个“Hello, World”程序&#xff0c;并且对变量的声明和初始化有了初步的认识。在这篇文章中&#xff0c;我们将主要介绍Go语言的数据…...

Django ORM解决Oracle表多主键的问题

现状 以Django 3.2为例 Django ORM 设计为默认使用单一主键&#xff08;通常是自增的 id 字段&#xff09;&#xff0c;这一选择主要基于以下核心原因&#xff1a; 简化ORM设计与操作 统一访问方式外键关联简化 避免歧义冲突 主键语义明确防止隐式依赖 性能与数据库兼容 索引…...

学习数据结构(2)空间复杂度+顺序表

1.空间复杂度 &#xff08;1&#xff09;概念 空间复杂度也是一个数学表达式&#xff0c;表示一个算法在运行过程中根据算法的需要额外临时开辟的空间。 空间复杂度不是指程序占用了多少bytes的空间&#xff0c;因为常规情况每个对象大小差异不会很大&#xff0c;所以空间复杂…...

实验一---典型环节及其阶跃响应---自动控制原理实验课

一 实验目的 1.掌握典型环节阶跃响应分析的基本原理和一般方法。 2. 掌握MATLAB编程分析阶跃响应方法。 二 实验仪器 1. 计算机 2. MATLAB软件 三 实验内容及步骤 利用MATLAB中Simulink模块构建下述典型一阶系统的模拟电路并测量其在阶跃响应。 1.比例环节的模拟电路 提…...

从零推导线性回归:最小二乘法与梯度下降的数学原理

​ 欢迎来到我的主页&#xff1a;【Echo-Nie】 本篇文章收录于专栏【机器学习】 本文所有内容相关代码都可在以下仓库中找到&#xff1a; Github-MachineLearning 1 线性回归 1.1 什么是线性回归 线性回归是一种用来预测和分析数据之间关系的工具。它的核心思想是找到一条直…...

OpenSIPS-从安装部署开始认识一个组件

前期讲到了Kamailio&#xff0c;它是一个不错的开源SIP&#xff08;Session Initiation Protocol&#xff09;服务器&#xff0c;主要用于构建高效的VoIP&#xff08;Voice over IP&#xff09;平台以及即时通讯服务。但是在同根同源&#xff08;OpenSER&#xff09;的分支上&a…...

数据结构(树)

每一个节点包含&#xff1a;父节点地址 值 左子节点地址 右子节点地址 如果一个节点不含有&#xff1a;父节点地址或左子节点地址 右子节点地址就记为null 二叉树 度&#xff1a;每一个节点的子节点数量 二叉树中&#xff0c;任意节点的度<2 树的结构&#xff1a; 二叉查…...

[Dialog屏幕开发] 设置搜索帮助

阅读该篇文章之前&#xff0c;可先阅读下述资料 [Dialog屏幕开发] 屏幕绘制(使用向导创建Tabstrip Control标签条控件)https://blog.csdn.net/Hudas/article/details/145372195?spm1001.2014.3001.5501https://blog.csdn.net/Hudas/article/details/145372195?spm1001.2014.…...

C语言从入门到进阶

视频&#xff1a;https://www.bilibili.com/video/BV1Vm4y1r7jY?spm_id_from333.788.player.switch&vd_sourcec988f28ad9af37435316731758625407&p23 //枚举常量 enum Sex{MALE,FEMALE,SECRET };printf("%d\n", MALE);//0 printf("%d\n", FEMALE…...

Node.js下载安装及环境配置教程 (详细版)

Node.js&#xff1a;是一个基于 Chrome V8 引擎的 JavaScript 运行时&#xff0c;用于构建可扩展的网络应用程序。Node.js 使用事件驱动、非阻塞 I/O 模型&#xff0c;使其非常适合构建实时应用程序。 Node.js 提供了一种轻量、高效、可扩展的方式来构建网络应用程序&#xff0…...

Mac Electron 应用签名(signature)和公证(notarization)

在MacOS 10.14.5之后&#xff0c;如果应用没有在苹果官方平台进行公证notarization(我们可以理解为安装包需要审核&#xff0c;来判断是否存在病毒)&#xff0c;那么就不能被安装。当然现在很多人的解决方案都是使用sudo spctl --master-disable&#xff0c;取消验证模式&#…...

redis安装 windows版本

下载 github下载5.x版本redis 安装以及启动 解压文件&#xff0c;目标如下 进入cmd至安装路径 执行如下命令启动redis redis-server.exe redis.windows.conf 进入redis,另外启动cmd在当前目录执行进入redis 服务 redis-cli 测试命令 至此安装成功&#xff0c;但是这只是…...

关联传播和 Python 和 Scikit-learn 实现

文章目录 一、说明二、什么是 Affinity Propagation。2.1 先说Affinity 传播的工作原理2.2 更多细节2.3 传播两种类型的消息2.4 计算责任和可用性的分数2.4.1 责任2.4.2 可用性分解2.4.3 更新分数&#xff1a;集群是如何形成的2.4.4 估计集群本身的数量。 三、亲和力传播的一些…...

若依基本使用及改造记录

若依框架想必大家都了解得不少&#xff0c;不可否认这是一款及其简便易用的框架。 在某种情况下&#xff08;比如私活&#xff09;使用起来可谓是快得一匹。 在这里小兵结合自身实际使用情况&#xff0c;记录一下我对若依框架的使用和改造情况。 一、源码下载 前往码云进行…...

c语言网 1127 尼科彻斯定理

原题 题目描述 验证尼科彻斯定理&#xff0c;即&#xff1a;任何一个整数m的立方都可以写成m个连续奇数之和。 输入格式 任一正整数 输出格式 该数的立方分解为一串连续奇数的和 样例输入 13 样例输出 13*13*132197157159161163165167169171173175177179181 ​ #include<ios…...

能说说MyBatis的工作原理吗?

大家好&#xff0c;我是锋哥。今天分享关于【Redis为什么这么快?】面试题。希望对大家有帮助&#xff1b; 能说说MyBatis的工作原理吗&#xff1f; MyBatis 是一款流行的持久层框架&#xff0c;它通过简化数据库操作&#xff0c;帮助开发者更高效地与数据库进行交互。MyBatis…...

卡特兰数学习

1&#xff0c;概念 卡特兰数&#xff08;英语&#xff1a;Catalan number&#xff09;&#xff0c;又称卡塔兰数&#xff0c;明安图数。是组合数学中一种常出现于各种计数问题中的数列。它在不同的计数问题中频繁出现。 2&#xff0c;公式 卡特兰数的递推公式为&#xff1a;f(…...

【算法】多源 BFS

多源 BFS 1.矩阵距离2.刺杀大使 单源最短路问题 vs 多源最短路问题 当问题中只存在一个起点时&#xff0c;这时的最短路问题就是单源最短路问题。当问题中存在多个起点而不是单一起点时&#xff0c;这时的最短路问题就是多源最短路问题。 多源 BFS&#xff1a;多源最短路问题…...

解锁数字经济新动能:探寻 Web3 核心价值

随着科技的快速发展&#xff0c;我们正迈入一个全新的数字时代&#xff0c;Web3作为这一时代的核心构成之一&#xff0c;正在为全球数字经济带来革命性的变革。本文将探讨Web3的核心价值&#xff0c;并如何推动数字经济的新动能。 Web3是什么&#xff1f; Web3&#xff0c;通常…...

CAN总线数据采集与分析

CAN总线数据采集与分析 目录 CAN总线数据采集与分析1. 引言2. 数据采集2.1 数据采集简介2.2 数据采集实现 3. 数据分析3.1 数据分析简介3.2 数据分析实现 4. 数据可视化4.1 数据可视化简介4.2 数据可视化实现 5. 案例说明5.1 案例1&#xff1a;数据采集实现5.2 案例2&#xff1…...

appium自动化环境搭建

一、appium介绍 appium介绍 appium是一个开源工具、支持跨平台、用于自动化ios、安卓手机和windows桌面平台上面的原生、移动web和混合应用&#xff0c;支持多种编程语言(python&#xff0c;java&#xff0c;Ruby&#xff0c;Javascript、PHP等) 原生应用和混合应用&#xf…...

二叉树高频题目——下——不含树型dp

一&#xff0c;普通二叉树上寻找两个节点的最近的公共祖先 1&#xff0c;介绍 LCA&#xff08;Lowest Common Ancestor&#xff0c;最近公共祖先&#xff09;是二叉树中经常讨论的一个问题。给定二叉树中的两个节点&#xff0c;它的LCA是指这两个节点的最低&#xff08;最深&…...

Java并发学习:进程与线程的区别

进程的基本原理 一个进程是一个程序的一次启动和执行&#xff0c;是操作系统程序装入内存&#xff0c;给程序分配必要的系统资源&#xff0c;并且开始运行程序的指令。 同一个程序可以多次启动&#xff0c;对应多个进程&#xff0c;例如同一个浏览器打开多次。 一个进程由程…...

【ProxyBroker】用Python打破网络限制的利器

ProxyBroker 1. 什么是ProxyBroker2. ProxyBroker的功能3. ProxyBroker的优势4. ProxyBroker的使用方法5. ProxyBroker的应用场景6.结语项目地址&#xff1a; 1. 什么是ProxyBroker ProxyBroker是一个开源工具&#xff0c;它可以异步地从多个来源找到公共代理&#xff0c;并同…...

Gradle buildSrc模块详解:集中管理构建逻辑的利器

文章目录 buildSrc模块二 buildSrc的使命三 如何使用buildSrc1. 创建目录结构2. 配置buildSrc的构建脚本3. 编写共享逻辑4. 在模块中引用 四 典型使用场景1. 统一依赖版本管理2. 自定义Gradle任务 3. 封装通用插件4. 扩展Gradle API 五 注意事项六 与复合构建&#xff08;Compo…...

2025数学建模美赛|F题成品论文

国家安全政策与网络安全 摘要 随着互联网技术的迅猛发展&#xff0c;网络犯罪问题已成为全球网络安全中的重要研究课题&#xff0c;且网络犯罪的形式和影响日益复杂和严重。本文针对网络犯罪中的问题&#xff0c;基于多元回归分析和差异中的差异&#xff08;DiD&#xff09;思…...

【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】1.10 文本数据炼金术:从CSV到结构化数组

1.10 《文本数据炼金术&#xff1a;从CSV到结构化数组》 目录 #mermaid-svg-TNkACjzvaSXnULaB {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-TNkACjzvaSXnULaB .error-icon{fill:#552222;}#mermaid-svg-TNkACjzva…...

「蓝桥杯题解」蜗牛(Java)

题目链接 这道题我感觉状态定义不太好想&#xff0c;需要一定的经验 import java.util.*; /*** 蜗牛* 状态定义&#xff1a;* dp[i][0]:到达(x[i],0)最小时间* dp[i][1]:到达 xi 上方的传送门最小时间*/public class Main {static Scanner in new Scanner(System.in);static f…...

基于springboot+vue的流浪动物救助系统的设计与实现

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…...

51单片机开发:IO扩展(串转并)实验

实验目标&#xff1a;通过扩展口从下至上依次点亮点阵屏的行。 下图左边是74HC595 模块电路图&#xff0c;右边是点阵屏电图图。 SRCLK上升沿时&#xff0c;将SER输入的数据移送至内部的移位寄存器。 RCLK上升沿时&#xff0c;将数据从移位寄存器移动至存储寄存器&#xff0c…...

JAVA实战开源项目:购物商城网站(Vue+SpringBoot) 附源码

本文项目编号 T 032 &#xff0c;文末自助获取源码 \color{red}{T032&#xff0c;文末自助获取源码} T032&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…...

C++学习——认识和与C的区别

目录 前言 一、什么是C 二、C关键字 三、与C语言不同的地方 3.1头文件 四、命名空间 4.1命名空间的概念写法 4.2命名空间的访问 4.3命名空间的嵌套 4.4命名空间在实际中的几种写法 五、输入输出 5.1cout 5.2endl 5.3cin 总结 前言 开启新的篇章&#xff0c;这里…...

Open FPV VTX开源之ardupilot双OSD配置摄像头

Open FPV VTX开源之ardupilot双OSD配置 1 源由2. 分析3. 配置4. 解决办法5. 参考资料 1 源由 鉴于笔者这台Mark4 Copter已经具备一定的历史&#xff0c;目前机载了两个FPV摄像头&#xff1a; 模拟摄像头数字摄像头(OpenIPC) 测试场景&#xff1a; 从稳定性的角度&#xff1…...

基于微信小程序高校课堂教学管理系统 课堂管理系统微信小程序(源码+文档)

目录 一.研究目的 二.需求分析 三.数据库设计 四.系统页面展示 五.免费源码获取 一.研究目的 困扰管理层的许多问题当中,高校课堂教学管理也是不敢忽视的一块。但是管理好高校课堂教学又面临很多麻烦需要解决,如何在工作琐碎,记录繁多的情况下将高校课堂教学的当前情况反…...

unity商店插件A* Pathfinding Project如何判断一个点是否在导航网格上?

需要使用NavGraph.IsPointOnNavmesh(Vector3 point) 如果点位于导航网的可步行部分&#xff0c;则为真。 如果一个点在可步行导航网表面之上或之下&#xff0c;在任何距离&#xff0c;如果它不在更近的不可步行节点之上 / 之下&#xff0c;则认为它在导航网上。 使用方法 Ast…...

三星手机人脸识别解锁需要点击一下电源键,能够不用点击直接解锁吗

三星手机的人脸识别解锁功能默认需要滑动或点击屏幕来解锁。这是为了增强安全性&#xff0c;防止误解锁的情况。如果希望在检测到人脸后直接进入主界面&#xff0c;可以通过以下设置调整&#xff1a; 打开设置&#xff1a; 进入三星手机的【设置】应用。 进入生物识别和安全&a…...

read+write实现:链表放到文件+文件数据放到链表 的功能

思路 一、 定义链表&#xff1a; 1 节点结构&#xff08;数据int型&#xff09; 2 链表操作&#xff08;创建节点、插入节点、释放链表、打印链表&#xff09;。 二、链表保存到文件 1打开文件 2遍历链表、写文件&#xff1a; 遍历链表,write()将节点数据写入文件。…...

猫怎么分公的母的?

各位铲屎官们&#xff0c;是不是刚领养了一只小猫咪&#xff0c;却分不清它是公是母&#xff1f;别急&#xff0c;今天就来给大家好好揭秘&#xff0c;如何轻松辨别猫咪的性别&#xff0c;让你不再为“它”是“他”还是“她”而烦恼&#xff01; 一、观察生殖器位置 最直接的方…...

为何SAP S4系统中要设置MRP区域?MD04中可否同时显示工厂级、库存地点级的数据?

【SAP系统PP模块研究】 一、物料主数据的MRP区域设置 SAP ECC系统中想要指定不影响MRP运算的库存地点,是针对库存地点设置MRP标识,路径为:SPRO->生产->物料需求计划->计划->定义每一个工厂的存储地点MRP,如下图所示: 另外,在给物料主数据MMSC扩充库存地点时…...

Redis for AI

Redis存储和索引语义上表示非结构化数据&#xff08;包括文本通道、图像、视频或音频&#xff09;的向量嵌入。将向量和关联的元数据存储在哈希或JSON文档中&#xff0c;用于索引和查询。 Redis包括一个高性能向量数据库&#xff0c;允许您对向量嵌入执行语义搜索。可以通过过…...

初阶2 类与对象

本章重点 上篇1.面向过程和面向对象初步认识2.类的引入---结构体3.类的定义3.1 语法3.2 组成3.3 定义类的两种方法&#xff1a; 4.类的访问限定符及封装4.1 访问限定符4.2封装---面向对象的三大特性之一 5.类的作用域6.类的实例化7.类对象模型7.1 如何计算类对象的大小 8.this指…...

kafka-部署安装

一. 简述&#xff1a; Kafka 是一个分布式流处理平台&#xff0c;常用于构建实时数据管道和流应用。 二. 安装部署&#xff1a; 1. 依赖&#xff1a; a). Java&#xff1a;Kafka 需要 Java 8 或更高版本。 b). zookeeper&#xff1a; #tar fxvz zookeeper-3.7.0.tar.gz #…...

深入探讨防抖函数中的 this 上下文

深入剖析防抖函数中的 this 上下文 最近我在研究防抖函数实现的时候&#xff0c;发现一个耗费脑子的问题&#xff0c;出现了令我困惑的问题。接下来&#xff0c;我将通过代码示例&#xff0c;深入探究这些现象背后的原理。 示例代码 function debounce(fn, delay) {let time…...

人工智能丨Midscene:让UI自动化测试变得更简单

在这个数字化时代&#xff0c;每一个细节的优化都可能成为产品脱颖而出的关键。而对于测试人员来说&#xff0c;确保产品界面的稳定性和用户体验的流畅性至关重要。今天&#xff0c;我要向大家介绍一款名为Midscene的工具&#xff0c;它利用自然语言处理&#xff08;NLP&#x…...

【数据结构】_链表经典算法OJ(力扣版)

目录 1. 移除链表元素 1.1 题目描述及链接 1.2 解题思路 1.3 程序 2. 反转链表 2.1 题目描述及链接 2.2 解题思路 2.3 程序 3. 链表的中间结点 3.1 题目描述及链接 3.2 解题思路 3.3 程序 1. 移除链表元素 1.1 题目描述及链接 原题链接&#xff1a;203. 移除链表…...