当前位置: 首页 > news >正文

分布式微服务系统架构第88集:kafka集群

使用集 群最大的好处是可以跨服务器进行负载均衡,再则就是可以使用复制功能来避免因单点故 障造成的数据丢失。在维护 Kafka 或底层系统时,使用集群可以确保为客户端提供高可用 性。


需要多少个broker

一个 Kafka 集群需要多少个 broker 取决于以下几个因素。首先,需要多少磁盘空间来保 留数据,以及单个 broker 有多少空间可用。如果整个集群需要保留 10TB 的数据,每个 broker 可以存储 2TB,那么至少需要 5 个 broker。如果启用了数据复制,那么至少还需要 一倍的空间,不过这要取决于配置的复制系数是多少

也就是说,如 果启用了数据复制,那么这个集群至少需要 10 个 broker。

第二个要考虑的因素是集群处理请求的能力。这通常与网络接口处理客户端流量的能力有 关,特别是当有多个消费者存在或者在数据保留期间流量发生波动(比如高峰时段的流量 爆发)时。如果单个 broker 的网络接口在高峰时段可以达到 80% 的使用量,并且有两个 消费者,那么消费者就无法保持峰值,除非有两个 broker。如果集群启用了复制功能,则 要把这个额外的消费者考虑在内。因磁盘吞吐量低和系统内存不足造成的性能问题,也可 以通过扩展多个 broker 来解决。

broker配置

要把一个 broker 加入到集群里,只需要修改两个配置参数。首先,所有 broker 都必须配 置相同的 zookeeper.connect,该参数指定了用于保存元数据的 Zookeeper 群组和路径。其次,每个 broker 都必须为 broker.id 参数设置唯一的值。如果两个 broker 使用相同的 broker.id,那么第二个 broker 就无法启动。在运行集群时,还可以配置其他一些参数,特 别是那些用于控制数据复制的参数

操作系统调优

大部分 Linux 发行版默认的内核调优参数配置已经能够满足大多数应用程序的运行需求, 不过还是可以通过调整一些参数来进一步提升 Kafka 的性能。这些参数主要与虚拟内存、 网络子系统和用来存储日志片段的磁盘挂载点有关。这些参数一般配置在 /etc/sysctl.conf 文件里

虚拟内存

一般来说 Linux 的虚拟内存会根据系统的工作负荷进行自动调整。我们可以对交换分区 的处理方式和内存脏页进行调整,从而让 Kafka 更好地处理工作负载。

对于大多数依赖吞吐量的应用程序来说,要尽量避免内存交换。内存页和磁盘之间的交换 对 Kafka 各方面的性能都有重大影响。Kafka 大量地使用系统页面缓存,如果虚拟内存被 交换到磁盘,说明已经没有多余内存可以分配给页面缓存了。

一种避免内存交换的方法是不设置任何交换分区。内存交换不是必需的,不过它确实能够 在系统发生灾难性错误时提供一些帮助。进行内存交换可以防止操作系统由于内存不足而 突然终止进程。基于上述原因,建议把 vm.swappiness 参数的值设置得小一点,比如 1。该 参数指明了虚拟机的子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。要优先考虑减小页面缓存,而不是进行内存交换。

为什么不把 vm.swappiness 设为零

先前,人们建议尽量把 vm.swapiness 设为 0,它意味着“除非发生内存溢 出,否则不要进行内存交换”。直到 Linux 内核 3.5-rc1 版本发布,这个值的 意义才发生了变化。这个变化被移植到其他的发行版上,包括 Red Hat 企业 版内核 2.6.32-303。在发生变化之后,0 意味着“在任何情况下都不要发生交 换”。所以现在建议把这个值设为 1。

脏页会被冲刷到磁盘上,调整内核对脏页的处理方式可以让我们从中获益。Kafka 依赖 I/O 性 能为生产者提供快速的响应。这就是为什么日志片段一般要保存在快速磁盘上,不管是单个 快速磁盘(如 SSD)还是具有 NVRAM 缓存的磁盘子系统(如 RAID)。这样一来,在后台刷 新进程将脏页写入磁盘之前,可以减少脏页的数量,这个可以通过将 vm.dirty_background_ ratio 设为小于 10 的值来实现。该值指的是系统内存的百分比,大部分情况下设为 5 就可以 了。它不应该被设为 0,因为那样会促使内核频繁地刷新页面,从而降低内核为底层设备的 磁盘写入提供缓冲的能力。

通过设置 vm.dirty_ratio 参数可以增加被内核进程刷新到磁盘之前的脏页数量,可以将它 设为大于 20 的值(这也是系统内存的百分比)。这个值可设置的范围很广,60~80 是个比 较合理的区间。不过调整这个参数会带来一些风险,包括未刷新磁盘操作的数量和同步刷 新引起的长时间 I/O 等待。如果该参数设置了较高的值,建议启用 Kafka 的复制功能,避 免因系统崩溃造成数据丢失。

为了给这些参数设置合适的值,最好是在 Kafka 集群运行期间检查脏页的数量,不管是在 生存环境还是模拟环境。可以在 /proc/vmstat 文件里查看当前脏页数量。

磁盘

除了选择合适的磁盘硬件设备和使用 RAID 外,文件系统是影响性能的另一个重要因素。有很多种文件系统可供选择,不过对于本地文件系统来说,EXT4(第四代可扩展文件系 统)和 XFS 最为常见。

网络

默认情况下,系统内核没有针对快速的大流量网络传输进行优化,所以对于应用程序来 说,一般需要对 Linux 系统的网络栈进行调优,以实现对大流量的支持。实际上,调整 Kafka 的网络配置与调整其他大部分 Web 服务器和网络应用程序的网络配置是一样的。首先可以对分配给 socket 读写缓冲区的内存大小作出调整,这样可以显著提升网络的传 输性能。socket 读写缓冲区对应的参数分别是 net.core.wmem_default 和 net.core.rmem_ default,合理的值是 131 072(也就是 128KB)。读写缓冲区最大值对应的参数分别是 net.core.wmem_max 和 net.core.rmem_max,合理的值是 2 097 152(也就是 2MB)。要注 意,最大值并不意味着每个 socket 一定要有这么大的缓冲空间,只是说在必要的情况下 才会达到这个值。

Kafka生产者——向Kafka写入数据


我们从创建一个 ProducerRecord 对象开始,ProducerRecord 对象需要包含目标主题和要发 送的内容。我们还可以指定键或分区。在发送 ProducerRecord 对象时,生产者要先把键和 值对象序列化成字节数组,这样它们才能够在网络上传输。

接下来,数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器 就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和 分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消 息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入 失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还 是失败,就返回错误信息。

创建Kafka生产者

bootstrap.servers

该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要 提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

key.serializer broker

希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因 此可以把 Java 对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生 产者需要知道如何把这些 Java 对象转换成字节数组。key.serializer 必须被设置为一 个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使 用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了 ByteArraySerializer (这个只做很少的事情)、StringSerializer 和 IntegerSerializer,因此,如果你只 使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。要注意,key. serializer 是必须设置的,就算你打算只发送值内容。

value.serializer

与 key.serializer 一样,value.serializer 指定的类会将值序列化。如果键和值都是字 符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符串, 那么需要使用不同的序列化器。

如何创建一个新的生产者

private Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer(kafkaProps);

我们把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到 达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候 也会丢失一些消息。

我们使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待, 就可以知道消息是否发送成功。

我们调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。

发送消息到Kafka

消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。send() 方法会返 回一个包含 RecordMetadata 的 Future 对象,不过因为我们会忽略返回值,所以无法知 道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录 Twitter 消息日志,或记录不太重要的应用程序日志。

我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消 息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException (说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已 满),又或者是 InterruptException(说明发送线程被中断)。

在这里,producer.send() 方法先返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常。如果没有发生错 误,我们会得到一个 RecordMetadata 对象,可以用它获取消息的偏移量。

如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允 许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。我们只是简单地把 异常信息打印出来。

KafkaProducer 一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息 来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可 以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重 试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如 “消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。

异步发送消息

假设消息在应用程序和 Kafka 集群之间一个来回需要 10ms。如果在发送完每个消息后都 等待回应,那么发送 100 个消息需要 1 秒。但如果只发送消息而不等待响应,那么发送 100 个消息所需要的时间会少很多。大多数时候,我们并不需要等待响应——尽管 Kafka 会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必 需的。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入 “错误消息”文件以便日后分析。

为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。

生产者的配置

1. acks acks

参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项。

• 如果 acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说, 如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢 失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大 速度发送消息,从而达到很高的吞吐量。

• 如果 acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功 响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个 没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是 Kafka生产者——向Kafka写入数据 | 37 同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象 的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回 调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生 产者在收到服务器响应之前可以发送多少个消息)。

• 如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自 服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算 有服务器发生崩溃,整个集群仍然可以运行不过,它的 延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。

2. buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果 应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候, send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full 参数

3. compression.type

默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 或 lz4,它指定了 消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明, 它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网 络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩 比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和 存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

4. retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况 下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会 放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前, 先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间), 让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不 过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情 况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

5. batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满, 批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满 的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大, 也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频 繁地发送消息,会增加一些额外的开销。

6. linger.ms

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在 批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生 产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数, 让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延 迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

7. client.id

该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指 标里。

8. max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用 越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务 器的,即使发生了重试。

9. timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metadata. fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误 (抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回 一个错误。

10. max.block.ms

该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞 时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻 塞时间达到 max.block.ms 时,生产者会抛出超时异常。

11. max.request.size

该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指 单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消 息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每 个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max. bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

12. receive.buffer.bytes 和 send.buffer.bytes

这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1, 就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以 适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

顺序保证

Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照 一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也 会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,往 一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!

不过, 有些场景对顺序不是很敏感。如果把 retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入 成功,broker 会重试写入第一个批次。

如果此时第一个批次也写入成功,那 么两个批次的顺序就反过来了。一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是 很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests. per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其 他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在 对消息的顺序有严格要求的情况下才能这么做。

序列化框架 Protobuf

Protobuf, Customer对象被序列化成:表示customerID的4字节整数 表示customerName长度的4字节整数(如果customerName为空,则长度为0) 表示customerName的N个字节

分区

ProducerRecord 对象包含了目标主题、键和值。Kafka 的消息是一个个 键值对,ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的 null,不 过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来 决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是 说,如果一个进程只从一个主题的分区读取数据 ,那么具有相 同键的所有记录都会被该进程读取。要创建一个包含键值的记录,只需像下面这样创建 ProducerRecord 对象:

如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用 的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用 的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列(使用 Kafka 自己 的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射 到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进 行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入 数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在 分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分 区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证 了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上。如果要使用键来映射 分区,那么最好在创建主题的时候就把分区规划好

Kafka消费者——从Kafka读取数据

消费者和消费者群组

假设我们有一个应用程序需要从一个 Kafka 主题读取消息并验证这些消息,然后再把它们 保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息 并保存结果。过了一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速 度,这个时候该怎么办?如果只使用单个消费者处理消息,应用程序会远跟不上消息生成 的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题 写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流。

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者 接收主题一部分分区的消息。


如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被 闲置,不会接收到任何消息

往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟 的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情 况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负 载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题 创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数 量超过主题分区的数量,多余的消费者只会被闲置。

简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组, 然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分 消息。


消费者群组和分区再均衡

管理员添加了新的分区,会发生分区重分配

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常 重要,它为消费者群组带来了高可用性和伸缩性

在再均衡期间,消费者无法读取消 息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消 费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢 应用程序。

消费者通过向被指派为群组协调器的 broker(不同的群组可以有不同的协调器)发送心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间 间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息 (为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会 话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才 会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者 时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理 停顿。

分配分区是怎样的一个过程

当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一 个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列 表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接 口的类来决定哪些分区应该被分配给哪个消费者。

分配 完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发 送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组 里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

订阅主题

轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据

消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据。

消费者的配置

1. fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时, 如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时 才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很 活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用 数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果 消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

2. fetch.max.wait.ms

我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 feth. max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低 潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设 为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返 回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。

3. max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB

它的默认值是 1MB,也 就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition. fetch.bytes 指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要 至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因 为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition. fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数 大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。

4. session.timeout.ms

该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如 果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为 已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器 发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一 般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一 般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat. interval.ms 应该是 1s。把 session.timeout.ms 值设得比默认值小,可以更快地检测和恢 复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设 置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

5. auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长 时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest,意 思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之 后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从 起始位置读取分区的记录。

6. enable.auto.commit

该属性指定了消费者是否自动提交偏移 量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自 己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。

7. partition.assignment.strategy

分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主 题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。

Range

该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消 费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会 出现这种情况。

8. client.id

该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、 度量指标和配额里。

9. max.poll.records

该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处 理的数据量。

10. receive.buffer.bytes 和 send.buffer.bytes

socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操 作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这 些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

加群联系作者vx:xiaoda0423

仓库地址:https://webvueblog.github.io/JavaPlusDoc/

相关文章:

分布式微服务系统架构第88集:kafka集群

使用集 群最大的好处是可以跨服务器进行负载均衡,再则就是可以使用复制功能来避免因单点故 障造成的数据丢失。在维护 Kafka 或底层系统时,使用集群可以确保为客户端提供高可用 性。 需要多少个broker 一个 Kafka 集群需要多少个 broker 取决于以下几个因…...

【信息系统项目管理师-选择真题】2008下半年综合知识答案和详解

更多内容请见: 备考信息系统项目管理师-专栏介绍和目录 文章目录 【第1题】【第2~3题】【第4题】【第5题】【第6题】【第7题】【第8题】【第9题】【第10~12题】【第13题】【第14~15题】【第16题】【第17题】【第18题】【第19题】【第20题】【第21题】【第22题】【第23题】【第…...

Ubuntu 20.04安装Protocol Buffers 2.5.0

个人博客地址:Ubuntu 20.04安装Protocol Buffers 2.5.0 | 一张假钞的真实世界 安装过程 Protocol Buffers 2.5.0源码下载:https://github.com/protocolbuffers/protobuf/tree/v2.5.0。下载并解压。 将autogen.sh文件中以下内容: curl htt…...

MySQL知识点总结(十四)

mysqldump和mysqlpump实用程序在功能上有哪些相同和不同的地方? 二者都能用来执行逻辑备份,将所有数据库,特定数据库或特定表转储到文本文件,可移植,独立于存储引擎,是很好的复制/移动策略,适合…...

人工智能在教育中的创新应用:打造未来的智慧课堂

人工智能在教育中的创新应用:打造未来的智慧课堂 在快速发展的科技时代,人工智能(AI)正悄无声息地改变着教育的面貌。从个性化学习到智能课堂管理,AI技术为教育带来了前所未有的创新与效率提升。今天,我想从实际应用的角度,聊聊人工智能如何帮助我们构建更智慧的教育生…...

最初公共前缀

hello 大家好!今天开写一个新章节,每一天一道算法题。让我们一起来学习算法思维吧! function longestCommonPrefix(strs) {// 如果输入的字符串数组为空,直接返回空字符串if (strs.length 0) {return "";}// 假设数组中…...

每日一题-判断是否是平衡二叉树

判断是否是平衡二叉树 题目描述数据范围题解解题思路递归算法代码实现代码解析时间和空间复杂度分析示例示例 1示例 2 总结 ) 题目描述 输入一棵节点数为 n 的二叉树,判断该二叉树是否是平衡二叉树。平衡二叉树定义为: 它是一棵空树。或者它的左右子树…...

Go反射指南

概念: 官方对此有个非常简明的介绍,两句话耐人寻味: 反射提供一种让程序检查自身结构的能力反射是困惑的源泉 第1条,再精确点的描述是“反射是一种检查interface变量的底层类型和值的机制”。 第2条,很有喜感的自嘲…...

【除夕】特别篇

除夕,是一个辞旧迎新的时刻。我们挥别了过去一年的风雨兼程,迎来了新一年的无限可能。在过去的一年里,我们或许经历了挑战,或许收获了成长。从年初到今天,我们一定克服了种种困难与挑战,这足以说明我们每个…...

Java内存区域详解

Java内存区域详解——章节结构 Java 内存模型是 JVM 的重要组成部分,深入理解内存区域的划分和用途是掌握 JVM 调优和诊断问题的关键。我们将通过以下章节逐步学习: 目录 概述:Java 内存区域与线程的关系程序计数器Java 虚拟机栈本地方法栈…...

DataWhale组队学习 fun-transformer task5

1. 词向量:单词的“身份证” 首先,我们定义了四个单词的词向量,每个向量维度为3。你可以把这些词向量想象成每个单词的“身份证”。每个身份证上有3个特征,用来描述这个单词的“性格”或“特点”。 word_1 np.array([1, 0, 0])…...

实现网站内容快速被搜索引擎收录的方法

本文转自:百万收录网 原文链接:https://www.baiwanshoulu.com/6.html 实现网站内容快速被搜索引擎收录,是网站运营和推广的重要目标之一。以下是一些有效的方法,可以帮助网站内容更快地被搜索引擎发现和收录: 一、确…...

什么是循环神经网络?

一、概念 循环神经网络(Recurrent Neural Network, RNN)是一类用于处理序列数据的神经网络。与传统的前馈神经网络不同,RNN具有循环连接,可以利用序列数据的时间依赖性。正因如此,RNN在自然语言处理、时间序列预测、语…...

python 判断复杂包含

目录 python 判断复杂包含 a和b都是拍好序的: python 判断复杂包含 a[10,13,15] b[[9,11],[11,13],[13,16]] b的子项是区间,返回b中子区间包含a其中元素的子项 if __name__ __main__:a [10, 11, 15]b [[9, 11], [11, 13], [13, 16]]# 筛选出包含…...

基于SpringBoot的阳光幼儿园管理系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…...

【PySide6快速入门】QDialog对话框的使用

文章目录 PySide6快速入门:QDialog对话框的使用前言QDialog的基本用法创建和显示对话框 QDialog的常用函数1. exec()2. accept()3. reject()4. setWindowTitle()5. setModal()6. setFixedSize()7. resize()8. reject()9. setLayout()10. open() 总结 PySide6快速入门…...

LiteFlow Spring boot使用方式

文章目录 概述LiteFlow框架的优势规则调用逻辑规则组件定义组件内数据获取通过 DefaultContext自定义上下文 通过 组件规则定义数据通过预先传入数据 liteflow 使用 概述 在每个公司的系统中,总有一些拥有复杂业务逻辑的系统,这些系统承载着核心业务逻…...

【ESP32】ESP-IDF开发 | WiFi开发 | TCP传输控制协议 + TCP服务器和客户端例程

1. 简介 TCP(Transmission Control Protocol),全称传输控制协议。它的特点有以下几点:面向连接,每一个TCP连接只能是点对点的(一对一);提供可靠交付服务;提供全双工通信&…...

用WinForm如何制作简易计算器

首先我们要自己搭好页面 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms;namespace _7_简易计算…...

指针的介绍2前

1.数组名的理解 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h>int main() {int arr[] { 1,2,3,4,5,6,7,8,9 };printf("&arr[0] %p\n", &arr[0]);printf("arr %p\n", arr);return 0; } 观察得到&#xff0c;数组名就是数组首…...

EasyExcel写入和读取多个sheet

最近在工作中&#xff0c;作者频频接触到Excel处理&#xff0c;因此也对EasyExcel进行了一定的研究和学习&#xff0c;也曾困扰过如何处理多个sheet&#xff0c;因此此处分享给大家&#xff0c;希望能有所帮助 目录 1.依赖 2. Excel类 3.处理Excel读取和写入多个sheet 4. 执…...

MATLAB中fetchOutputs函数用法

目录 语法 说明 示例 在后台运行函数 fetchOutputs函数的功能是从在后台运行的函数中检索结果。 语法 [Y1,...,Ym] fetchOutputs(F) [Y1,...,Ym] fetchOutputs(F,UniformOutputfalse) 说明 [Y1, ..., Ym] fetchOutputs(F) 从 Future 数组 F 中检索出 m 个结果。 F 中…...

nosql mysql的区别

NoSQL 和 MySQL 是两种不同类型的数据库管理系统&#xff0c;它们在设计理念、数据模型、可扩展性和应用场景等方面有着本质的区别。 NoSQL 数据库 特点: 灵活的数据模型: NoSQL 数据库通常没有固定的表结构&#xff0c;可以很容易地存储不同结构的文档或键值对。水平扩展: …...

获取加工视图下所有元素

UF_SETUP_ask_program_root //程序顺序 视图 UF_SETUP_ask_mct_root //机床视图 UF_SETUP_ask_mthd_root //加工方法视图 UF_SETUP_ask_geom_root //几何视图 UF_initialize();//初始化 UF_UI_ONT_refresh();//刷新加工导航器 UF_UI_open_listing_window(); tag_t …...

go到底是什么意思:对go的猜测或断言

go这个单词&#xff0c;简单地讲&#xff0c;表示“走或去”的意思&#xff1a; go v.去&#xff1b;走 认真想想&#xff0c;go是一个非常神秘的单词&#xff0c;g-和o-这两个字母&#xff0c;为什么就会表达“去&#xff1b;走”的意思呢&#xff1f;它的字面义或本质&…...

AIP-133 标准方法:Create

编号133原文链接AIP-133: Standard methods: Create状态批准创建日期2019-01-23更新日期2019-01-23 在REST API中&#xff0c;通常向集合URI&#xff08;如 /v1/publishers/{publisher}/books &#xff09;发出POST请求&#xff0c;在集合中创建新资源。 面向资源设计&#x…...

大一计算机的自学总结:位运算的应用及位图

前言 不仅异或运算有很多骚操作&#xff0c;位运算本身也有很多骚操作。&#xff08;尤其后几个题&#xff0c;太逆天了&#xff09; 一、2 的幂 class Solution { public:bool isPowerOfTwo(int n) {return n>0&&n(n&-n);} }; 根据二进制表示数的原理&#…...

2025年01月28日Github流行趋势

项目名称&#xff1a;maybe 项目地址url&#xff1a;https://github.com/maybe-finance/maybe项目语言&#xff1a;Ruby历史star数&#xff1a;37540今日star数&#xff1a;1004项目维护者&#xff1a;zachgoll, apps/dependabot, tmyracle, Shpigford, crnsh项目简介&#xff…...

java基础-容器

一、集合基础 1、集合 Collection接口下&#xff0c;主要用于存放单一元素Map接口下&#xff0c;用于存放键值对 2、常见集合的比较 List 存储的元素是有序的、可重复的。Set: 存储的元素不可重复的。Queue: 按特定的排队规则来确定先后顺序&#xff0c;存储的元素是有序的、…...

PythonFlask框架

文章目录 处理 Get 请求处理 POST 请求应用 app.route(/tpost, methods[POST]) def testp():json_data request.get_json()if json_data:username json_data.get(username)age json_data.get(age)return jsonify({username: username测试,age: age})从 flask 中导入了 Flask…...

Android 启动流程

一 Bootloader 在嵌入式系统中&#xff0c;Bootloader的引导过程与传统的PC环境有所不同&#xff0c;主要是因为嵌入式系统的硬件配置和应用场景更加多样化。以下是嵌入式系统中Bootloader被引导的一般流程&#xff1a; 1. 硬件复位 当嵌入式设备上电或复位时&#xff0c;处…...

【信息系统项目管理师-选择真题】2009下半年综合知识答案和详解

更多内容请见: 备考信息系统项目管理师-专栏介绍和目录 文章目录 【第1~2题】【第3题】【第4题】【第5题】【第6题】【第7题】【第8题】【第9题】【第10题】【第11题】【第12题】【第13题】【第14题】【第15题】【第16题】【第17题】【第18题】【第19题】【第20题】【第21题】…...

TL494方案开关电源方案

TL494是德州仪器公司生产的一款固定频率脉宽调制&#xff08;PWM&#xff09;控制芯片&#xff0c;广泛应用于开关电源等电路中&#xff0c;以下是其相关方案介绍&#xff1a; 基本特性 双端输出&#xff1a;可提供推挽或单端两种输出模式。推挽模式下能驱动两个功率开关管交…...

RocketMQ事务消息是如何实现的?

大家好&#xff0c;我是锋哥。今天分享关于【RocketMQ事务消息是如何实现的&#xff1f;】面试题。希望对大家有帮助&#xff1b; RocketMQ事务消息是如何实现的&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 RocketMQ 事务消息的实现是通过 分布式事…...

16届蓝桥杯寒假刷题营】第2期DAY5IOI赛

3.小蓝小彬的代码挑战 - 蓝桥云课 问题描述 在蓝桥杯大赛中&#xff0c;小蓝和小彤是一对好朋友。他们在比赛中遇到了一个有趣的挑战。这个挑战是给定一个由大写字母组成的代码&#xff0c;他们需要找出这串代码中有多少个子序列LQB。小蓝和小彬都很聪明&#xff0c;他们想到…...

【云安全】云原生-K8S-搭建/安装/部署

一、准备3台虚拟机 务必保证3台是同样的操作系统&#xff01; 1、我这里原有1台centos7&#xff0c;为了节省资源和效率&#xff0c;打算通过“创建链接克隆”2台出来 2、克隆之前&#xff0c;先看一下是否存在k8s相关组件&#xff0c;或者docker相关组件 3、卸载原有的docker …...

[A-29]ARMv8/v9-GIC-中断子系统的安全架构设计(Security/FIQ/IRQ)

ver0.1 前言 打开这篇文章的时候,我们已经为每一个中断信号规划一条路径,在外设和PE-Core之间建立了消息通道,外设有紧急的情况下可以给SOC中的大哥打报告了。下面就把接力棒就交到了CPU手里了,但是PE-Core要交给那个Exception Level以及Security下运行的软件处理呢?本文…...

12 款开源OCR发 PDF 识别框架

2024 年 12 款开源文档解析框架的选型对比评测&#xff1a;PDF解析、OCR识别功能解读、应用场景分析及优缺点比较 这是该系列的第二篇文章&#xff0c;聚焦于智能文档处理&#xff08;特别是 PDF 解析&#xff09;。无论是在模型预训练的数据收集阶段&#xff0c;还是基于 RAG…...

使用 lock4j-redis-template-spring-boot-starter 实现 Redis 分布式锁

在分布式系统中&#xff0c;多个服务实例可能同时访问和修改共享资源&#xff0c;从而导致数据不一致的问题。为了解决这个问题&#xff0c;分布式锁成为了关键技术之一。本文将介绍如何使用 lock4j-redis-template-spring-boot-starter 来实现 Redis 分布式锁&#xff0c;从而…...

css-background-color(transparent)

1.前言 在 CSS 中&#xff0c;background-color 属性用于设置元素的背景颜色。除了基本的颜色值&#xff08;如 red、blue 等&#xff09;和十六进制颜色值&#xff08;如 #FF0000、#0000FF 等&#xff09;&#xff0c;还有一些特殊的属性值可以用来设置背景颜色。 2.backgrou…...

MySQL 9.2.0 的功能

MySQL 9.2.0 的功能 MySQL 9.2.0 的功能新增、弃用和删除内容如下&#xff1a; 新增功能 权限新增12&#xff1a;引入了CREATE_SPATIAL_REFERENCE_SYSTEM权限&#xff0c;拥有该权限的用户可执行CREATE SPATIAL REFERENCE SYSTEM、CREATE OR REPLACE SPATIAL REFERENCE SYSTEM…...

编程题-最长的回文子串(中等)

题目&#xff1a; 给你一个字符串 s&#xff0c;找到 s 中最长的回文子串。 示例 1&#xff1a; 输入&#xff1a;s "babad" 输出&#xff1a;"bab" 解释&#xff1a;"aba" 同样是符合题意的答案。示例 2&#xff1a; 输入&#xff1a;s &…...

打破传统束缚:领略 Web3 独特魅力

在互联网发展的历程中&#xff0c;我们见证了Web1和Web2的变迁。Web1是静态信息的展示平台&#xff0c;Web2则引领了社交互动和内容创作的繁荣&#xff0c;而如今&#xff0c;Web3作为新时代的互联网架构&#xff0c;正逐渐展现出其独特的魅力&#xff0c;带领我们走向一个更加…...

linux系统中的 scp的使用方法

SCP&#xff08;Secure Copy Protocol&#xff09;是一种通过加密的方式在本地主机和远程主机之间安全地传输文件的协议。 它是基于SSH协议的扩展&#xff0c;允许用户在不同主机之间进行文件复制和传输&#xff0c;是Linux和Unix系统中常用的工具之一。 在嵌入式Linux软件的…...

RAG技术:通过向量检索增强模型理解与生成能力

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…...

【现代深度学习技术】深度学习计算 | 参数管理

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上&#xff0c;结合当代大数据和大算力的发展而发展出来的。深度学习最重…...

【Linux基础指令】第三期

近期更新的基础指令链接&#xff1a; 【Linux基础指令】第一期-CSDN博客 【Linux基础指令】第二期-CSDN博客 本期博客的主题依旧是 "基础指令" &#xff1b;话不多说&#xff0c;正文开始。 一、Linux的指令 1.zip / unzip 功能&#xff1a;打包压缩 命令格式&…...

WPS数据分析000005

目录 一、数据录入技巧 二、一维表 三、填充柄 向下自动填充 自动填充选项 日期填充 星期自定义 自定义序列 1-10000序列 四、智能填充 五、数据有效性 出错警告 输入信息 下拉列表 六、记录单 七、导入数据 ​编辑 八、查找录入 会员功能 Xlookup函数 VL…...

UiAutomator的详细介绍

UIAutomator作为一种高效的测试框架&#xff0c;通过自动化手段显著提升了用户界面&#xff08;UI&#xff09;测试的效率与准确性。它不仅支持自动生成功能测试用例&#xff0c;还允许开发者在不同设备上执行这些测试&#xff0c;确保了应用程序的一致性和稳定性。 以下是对 …...

在虚拟机里运行frida-server以实现对虚拟机目标软件的监测和修改参数(一)

frida-server下载路径 我这里选择frida-server-16.6.6-android-x86_64 以root身份启动adb 或 直接在android studio中打开 adb root 如果使用android studio打开的话&#xff0c;最好选择google api的虚拟机&#xff0c;默认以root模式开启 跳转到下载的frida-server文件位…...