当前位置: 首页 > news >正文

【kafka系列】生产者

目录

发送流程

1. 流程逻辑分析

阶段一:主线程处理

阶段二:Sender 线程异步发送

核心设计思想

2. 流程

关键点总结

重要参数

一、核心必填参数

二、可靠性相关参数

三、性能优化参数

四、高级配置

五、安全性配置(可选)

六、错误处理与监控

典型配置示例

关键注意事项


发送流程

  • 序列化与分区:消息通过Partitioner选择目标分区(默认轮询或哈希),序列化后加入RecordAccumulator缓冲区。
  • 批次合并Sender线程将同一分区的消息合并为ProducerBatch,减少网络请求(源码见Sender.run()方法)。
  • 发送至Broker:通过NetworkClient异步发送,Broker的LogAppendTime处理写入请求。
  • ACK机制:根据acks配置(0/1/all)等待Broker确认,通过Metadata类更新分区元数据

1. 流程逻辑分析

Kafka 生产者发送消息的核心流程分为 主线程处理Sender 线程异步发送 两个阶段,具体步骤如下:


阶段一:主线程处理
  1. 创建 ProducerRecord
    • 用户调用 producer.send(ProducerRecord),指定 Topic、Key、Value 和可选的分区或时间戳。
  1. 选择分区(Partition)
    • 若未指定分区,根据以下规则选择:
      • 有 Key:对 Key 哈希取模(hash(key) % 分区数),确保相同 Key 的消息进入同一分区。
      • 无 Key:默认使用粘性分区策略(Sticky Partitioning,Kafka 2.4+),在批次填满或超时前发送到同一分区,提升性能。
  1. 序列化(Serialize)
    • 使用配置的 key.serializervalue.serializer 对 Key 和 Value 序列化(如 StringSerializerByteArraySerializer)。
  1. 追加到缓冲区(RecordAccumulator)
    • 将消息按 Topic-Partition 分组,存入 RecordAccumulator 的批次(Batch)中。
    • 批次策略
      • batch.size:批次大小阈值(默认 16KB),达到阈值立即发送。
      • linger.ms:批次等待时间(默认 0ms),超时后发送未满批次。

阶段二:Sender 线程异步发送
  1. Sender 线程拉取批次
    • Sender 线程定期检查缓冲区,将满足条件的批次(已满或超时)封装为 ProducerRequest
  1. 构建请求并发送到 Broker
    • 根据分区的 Leader 副本所在 Broker,将请求发送到对应的节点。
    • 关键配置
      • acks:控制消息持久化确认级别:
        • 0:不等待确认(可能丢失数据)。
        • 1:等待 Leader 确认(默认)。
        • all:等待所有 ISR 副本确认(最高可靠性)。
      • max.in.flight.requests.per.connection:控制单个 Broker 的未确认请求数(默认 5)。
  1. 处理 Broker 响应
    • 成功:触发用户设置的 Callback 回调,并释放批次内存。
    • 失败
      • 可重试错误(如网络抖动、Leader 切换):根据 retries(默认 0)和 retry.backoff.ms(默认 100ms)重试。
      • 不可重试错误(如消息过大):直接触发回调并抛出异常。

核心设计思想
  • 异步批处理:通过缓冲区合并小消息,减少网络 I/O 次数。
  • 零拷贝优化:使用 sendfile 系统调用提升网络传输效率。
  • 高可靠性:通过重试机制和 acks=all 确保消息不丢失。

2. 流程


关键点总结

  1. 分区选择:优先使用 Key 哈希或粘性分区策略,保证消息顺序性和吞吐量。
  2. 批次优化:通过 batch.sizelinger.ms 平衡延迟与吞吐。
  3. 可靠性保障:通过 acksretries 配置确保消息持久化。
  4. 异步处理:主线程与 Sender 线程解耦,避免阻塞用户逻辑。

重要参数

以下是 Kafka 生产者(Producer)在日常开发中的 常见配置参数 及其作用,按功能分类整理成表格:


一、核心必填参数

参数名

默认值

说明

bootstrap.servers

Kafka 集群地址列表(逗号分隔,如 host1:9092,host2:9092

)。

key.serializer

Key 的序列化类(如 org.apache.kafka.common.serialization.StringSerializer

)。

value.serializer

Value 的序列化类(同上)。


二、可靠性相关参数

参数名

默认值

说明

acks

1

消息持久化确认机制:

0:不等待确认(可能丢失数据)。 1:等待 Leader 确认(默认)。all:等待所有 ISR 副本确认(最高可靠性)。

retries

0

发送失败后的重试次数(建议设为 Integer.MAX_VALUE

配合 delivery.timeout.ms

)。

enable.idempotence

false

是否启用幂等性(true时保证消息不重复,需配合 acks=all

retries>0)。

max.in.flight.requests.per.connection

5

单个 Broker 的未确认请求数。若启用幂等性,建议设为 1

以保证顺序。


三、性能优化参数

参数名

默认值

说明

linger.ms

0

消息在缓冲区等待时间(毫秒),增大可提升吞吐量(但增加延迟)。

batch.size

16384

(16KB)

单个批次的大小阈值,达到阈值后立即发送。

buffer.memory

33554432

(32MB)

生产者缓冲区的总内存大小。

compression.type

none

消息压缩算法(gzip

snappy

lz4

zstd

),减少网络带宽占用。


四、高级配置

参数名

默认值

说明

request.timeout.ms

30000

(30秒)

生产者等待 Broker 响应的超时时间。

max.block.ms

60000

(60秒)

生产者缓冲区满或元数据不可用时的阻塞时间(超时抛异常)。

partitioner.class

默认轮询/哈希策略

自定义分区策略(实现 Partitioner

接口)。


五、安全性配置(可选)

参数名

默认值

说明

security.protocol

PLAINTEXT

安全协议(如 SSL

SASL_SSL

)。

ssl.keystore.location

SSL 证书路径(客户端认证时需配置)。

sasl.mechanism

SASL 认证机制(如 PLAIN

SCRAM-SHA-256

)。


六、错误处理与监控

参数名

默认值

说明

interceptor.classes

生产者拦截器(实现 ProducerInterceptor

接口),用于监控或修改消息。

metrics.sample.window.ms

30000

(30秒)

性能指标采样窗口时间。


典型配置示例

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");

关键注意事项

  1. 可靠性 vs 性能
    • acks=allenable.idempotence=true 提高可靠性,但可能降低吞吐量。
    • 增大 batch.sizelinger.ms 可提升吞吐量,但增加延迟。
  1. 幂等性限制
    • 需 Kafka 0.11+ 版本支持,且 max.in.flight.requests=1(或 Kafka 2.0+ 允许 5)。
  1. 监控与调优
    • 通过 metrics 和拦截器监控生产者性能,动态调整参数

相关文章:

【kafka系列】生产者

目录 发送流程 1. 流程逻辑分析 阶段一:主线程处理 阶段二:Sender 线程异步发送 核心设计思想 2. 流程 关键点总结 重要参数 一、核心必填参数 二、可靠性相关参数 三、性能优化参数 四、高级配置 五、安全性配置(可选&#xff0…...

Kafka日志数据深度解析:从基础查看到高级操作全攻略

#作者:孙德新 文章目录 查看log日志文件(kafka-dump-log.sh)1、查看Log文件基本数据信息2、index文件健康性检查(--index-sanity-check)3、转储文件(--max-message-size)4、偏移量解码(--offsets-decoder)5、日志数据解析(--transaction-log-decoder)6、查询Log文件…...

单例模式、构造函数、左值右值

拷贝构造函数 简单的说就是——用一个对象构造另外一个对象 class Myclass {public:int d0;Myclass(int d_){d d_}; //常用的构造函数Myclass(Myclass c) //拷贝构造函数{d c.d;} }; //对比 class Myclass {public:int d0;Myclass(int d_){d d_}; //常用的构造函数Myclass…...

DeepSeek+即梦 做AI视频

DeepSeek做AI视频 制作流程第一步:DeepSeek 生成视频脚本和分镜 第二步:生成分镜图片绘画提示词第三步:生成分镜图片第四步:使用可灵 AI 工具,将生成的图片转成视频。第五步:剪映成短视频 DeepSeek 真的强&…...

「软件设计模式」建造者模式(Builder)

深入解析建造者模式:用C打造灵活对象构建流水线 引言:当对象构建遇上排列组合 在开发复杂业务系统时,你是否经常面对这样的类:它有20个成员变量,其中5个是必填项,15个是可选项。当用户需要创建豪华套餐A&…...

Android设备 网络安全检测

八、网络与安全机制 6.1 网络框架对比 volley: 功能 基于HttpUrlConnection;封装了UIL图片加载框架,支持图片加载;网络请求的排序、优先级处理缓存;多级别取消请求;Activity和生命周期的联动(Activity结束生命周期同时取消所有网络请求 …...

安心联车辆管理系统的硬件架构详解

安心联车辆管理系统的硬件架构可分为车载设备和后端平台设备两大部分,以下是详细的硬件组成及功能说明: 一、车载设备 定位与通信模块 北斗/GPS双模定位模块:支持厘米级定位精度,兼容JT/T808、JT/T809等交通部标准协议&#xff0c…...

适用于iOS的应用商店优化(ASO)清单

面对App Store的激烈竞争,您想优化您的应用使其在竞争中脱颖而出,但又不知道应该从哪里开始。我们已经为您准备好了!我们整理了一份适用于iOS的应用商店优化(ASO)检查清单,用以帮助您入门并提高您在App Sto…...

linux概念详解

用户守护进程 用户空间守护进程是一些在后台运行的长期服务程序,提供系统级服务。 下面举一些例子。 网络服务: 如sshd(SSH服务)、httpd(HTTP服务)。 sshd:sshd 守护进程会在后台运行&#x…...

嵌入式开发应该具备哪些编程思维?

目录 1、资源限制思维 2、实时性思维 3、硬件抽象思维 4、中断驱动思维 5、功耗优化思维 6、可靠性和容错思维 7、并发和同步思维 8、故障排除与调试思维 9、状态机思维 嵌入式开发与一般的软件开发不同,嵌入式系统通常受到资源(内存、处理器、…...

MongoDB索引介绍

索引简述 索引是什么 索引在数据库技术体系中占据了非常重要的位置,其主要表现为一种目录式的数据结构,用来实现快速的数据查询。通常在实现上,索引是对数据库表(集合)中的某些字段进行抽取、排列之后,形成的一种非常易于遍历读取…...

编程速递-庆祝Delphi诞生30周年!

庆祝Delphi 30周年纪念是一个特别的时刻。 回到1995年,也就是30年前,在微软Windows和互联网时代的曙光初现之时,Borland Delphi的创建者们无法想象,当时使用Borland Delphi构建的应用程序至今仍在运行——为全世界数十亿人服务。…...

YOLOv11-ultralytics-8.3.67部分代码阅读笔记-tuner.py

tuner.py ultralytics\utils\tuner.py 目录 tuner.py 1.所需的库和模块 2.def run_ray_tune(model, space: dict None, grace_period: int 10, gpu_per_trial: int None, max_samples: int 10, **train_args,): 1.所需的库和模块 # Ultralytics 🚀 AGPL-…...

一文说清楚什么是Token以及项目中使用Token延伸的问题

首先可以参考我的往期文章,我这里说清楚了Cookie,Seesion,Token以及JWT是什么 其实Token你就可以理解成这是一个认证令牌就好了 详细分清Session,Cookie和Token之间的区别,以及JWT是什么东西_还分不清 cookie、sessi…...

VueRouter 实例

分析下列代码 const router new VueRouter({mode:history,routes }) 1.const router new VueRouter({ ... })用来创建一个 Vue Router 实例,用于管理 Vue.js 应用的路由。2.mode: history: 作用:启用 HTML5 History 模式,去除…...

【算法工程】解决linux下Aspose.slides提示No usable version of libssl found以及强化推理模型的短板

1. 背景 构建ubuntu镜像,然后使用Aspose.slides解析PPTX文档,发现一直提示“No usable version of libssl found”。 2. 尝试 使用deepseek R1、kimi1.5、chatgpt o3,并且都带上联网能力,居然还是没有一个能够真正解决&#xf…...

解析浏览器中JavaScript与Native交互原理:以WebGPU为例

引言 随着Web应用复杂度的提升,开发者对浏览器访问本地硬件能力的需求日益增长。然而,浏览器必须在开放性与安全性之间找到平衡——既不能放任JavaScript(JS)随意操作系统资源,又要为高性能计算、图形渲染等场景提供支…...

小火车理论

格助词...

深度学习框架探秘|Keras 应用案例解析以及 Keras vs TensorFlow vs PyTorch

引言 上一篇文章《深度学习框架探秘|Keras:深度学习的魔法钥匙》 我们初步学习了 Keras,包括它是什么、具备哪些优势(简洁易用的 API、强大的兼容性、广泛的应用领域),以及基本使用方法。本文,…...

【01 背包】

01 背包解题思路: 有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i],得到的价值是value[i] 。每件物品只能用一次,求解将哪些物品装入背包里物品价值总和最大。 这是标准的背包问题,每一件物品其实只有两个状…...

算法10-二分查找算法

一、二分查找算法概念 二分查找(Binary Search)是一种高效的查找算法,适用于在有序数组中快速查找目标值。它的核心思想是通过不断缩小查找范围,将时间复杂度从线性查找的 O(n) 优化到 O(log n)。 二、二分查找的流程图 以下是二…...

变相提高大模型上下文长度-RAG文档压缩-3.优化map-reduce(reranker过滤+社区聚类)

我遇到的业务问题实际上是RAG需要处理很多同一对象的日常报告,不像常识类问题,它的相关Document更多而且更分散,日常报告代表数据库里有很多它的内容,而且对象可能只在段落中的几句话提及到。top-k数量受限于大模型长度&#xff0…...

算法11-分治算法

一、分治算法概念 分治算法(Divide and Conquer)是一种重要的算法设计思想,通过将问题分解为多个子问题,分别解决后再合并结果,从而解决原问题。分治算法的核心思想是“分而治之”,通常包含三个步骤&#…...

Golang internals

To be continued... time.Time golang的时区和神奇的time.Parse context.Context Go Context的踩坑经历 sync.Pool sync.Pool workflow in Go 1.12 new shared pools in Go 1.13 什么是cpu cache理解 Go 1.13 中 sync.Pool 的设计与实现Go: Understand the Design of Sync.Pool…...

Flask中获取请求参数的一些方式总结

在 Flask 中,可以从 request 对象中获取各种类型的参数。以下是全面整理的获取参数的方式及示例代码。 1. 获取 URL 查询参数(Query String Parameters) URL 中的查询参数通过 ?keyvalue&key2value2 的形式传递,使用 reques…...

vscode/cursor 写注释时候出现框框解决办法

一、问题描述 用vscode/cursor写注释出现如图的框框,看着十分难受,用pycharm就没有 二、解决办法 以下两种,哪个好用改那个 (1)Unicode Highlight:Ambiguous Characters Unicode Highlight:Ambiguous Characters &a…...

11-跳跃游戏

给你一个非负整数数组 nums ,你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标,如果可以,返回 true ;否则,返回 false 。 贪心算法思路分析 在遍…...

TestHubo基础教程-创建项目

TestHubo是一款国产开源一站式测试工具,涵盖功能测试、接口测试、性能测试,以及 Web 和 App 测试,可以满足不同类型项目的测试需求。本文将介绍如何快速创建第一个项目,以快速入门上手。 1、创建项目 在 TestHubo 中,…...

GHOST重装后DEF盘丢失的全面解析与数据恢复实战指南

GHOST作为一款经典的系统备份与还原工具,因其高效便捷的特性被广泛应用于系统重装和数据恢复场景。然而,许多用户在使用GHOST重装系统后,发现DEF盘(即D盘、E盘、F盘等非系统盘)突然丢失,导致重要数据无法访…...

soular基础教程-使用指南

soular是TikLab DevOps工具链的统一帐号中心,今天来介绍如何使用 soular 配置你的组织、工作台,快速入门上手。  1. 账号管理 可以对账号信息进行多方面管理,包括分配不同的部门、用户组等,从而确保账号权限和职责…...

刷题记录(回顾)HOT100 二叉树-10: ​199. 二叉树的右视图

题目:199. 二叉树的右视图 难度:中等 给你一个二叉树的根节点 root ,判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下: 节点的左 子树 只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左…...

【Java学习】类和对象

目录 一、选择取块解 二、类变量 三、似复刻变量 四、类变量的指向对象 五、变量的解引用访问 1.new 类变量(参) 2.this(参) 3.类变量/似复刻变量. 六、代码块 七、复制变量的赋值顺序 八、访问限定符 1.private 2.default 九、导类 一、选择取块解 解引用都有可以…...

安卓基础(Adapter)

想象一下,你有一堆玩具(数据),这些玩具很特别,每个玩具都是不同的,可能有汽车、飞机、积木等。现在,你想把这些玩具摆放到一个展示柜(显示的界面)里,给大家看…...

mybatis-lombok工具包介绍

Lombok是一个实用的]ava类库,能通过注解的形式自动生成构造器、getter/setter、equals、hashcode、toString等方法,并可以自动化生成日志变量,简化java开发、提高效率。 使用前要加入Lombok依赖...

React - 高阶函数-函数柯里化

在 JavaScript 和 React 中,高阶函数是指能够接收其它函数作为参数,或者返回一个函数的函数。柯里化是一种将函数的多个参数转化为一系列嵌套函数的技术,通常用于简化函数的使用和提高其可组合性。 使用前: import React,{Compo…...

数据守护者:备份文件的重要性及自动化备份实践

在信息化社会,数据已成为企业运营和个人生活的重要组成部分。无论是企业的核心业务数据,还是个人的珍贵照片、重要文档,数据的丢失或损坏都可能带来无法估量的损失。因此,备份文件的重要性愈发凸显,它不仅是数据安全的…...

【kafka系列】消费者重平衡

目录 流程 1. 消费者组重平衡(Rebalance)的流程逻辑分析 阶段一:触发重平衡 阶段二:消费者组协调 阶段三:重平衡完成 关键设计思想 2. Mermaid 流程代码 关键点总结 重平衡的影响 1. 重平衡期间的消费行为 2…...

光谱相机在天文学领域的应用

天体成分分析 恒星成分研究:恒星的光谱包含了其大气中各种元素的吸收和发射线特征。通过光谱相机精确测量这些谱线,天文学家能确定恒星大气中氢、氦、碳、氮、氧等元素的含量。如对太阳的光谱分析发现,太阳大气中氢元素占比约 71%&#xff0…...

Java 基于 SpringBoot+Vue 的家政服务管理平台设计与实现

博主介绍:✌程序员徐师兄、8年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战*✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅&#x1f447…...

ABC393E/F简要题解

ABC393E 给定数组 A A A,求包含元素 A i A_i Ai​的大小为 k k k的子集中最大的最大公约数。 题解: 首先思考对于整个数组所有包含 k k k个元素的子集中最大的GCD是多少,可以怎么求。 我们发现,如果一个数 x x x,数组中如果存在至少 k k …...

什么是Mustache

Mustache 是一种轻量级模板引擎,用于将变量插入到模板中生成最终的文本输出。它的设计简单且易于使用,适用于多种编程语言,包括 JavaScript、Python、Ruby、Java 等。 Mustache 的模板语法使用双大括号 {{}} 包裹变量或表达式,用…...

GGUF格式的DeepSeek-R1-Distill-Qwen-1.5B模型的字段解析

在将GGUF文件转换为PyTorch格式之前,先要读取文件并了解模型中都有什么字段,会遇到了各种参数不匹配的问题。现在,我们先读取GGUF文件的元数据字段,并希望将这些字段中的内存映射(mmap)数据转换为字符串显示…...

Java和SQL测试、性能监控中常用工具

下面我会详细列举一些在Java和SQL测试、调试、性能监控中常用的工具,并结合项目中提到的各个技术点说明如何选择合适的工具和方法。 一、Java项目常用的测试、调试与性能监控工具 单元测试与集成测试: JUnit/TestNG: 用于编写单元测试和集成测…...

CAS单点登录(第7版)13.票务

如有疑问,请看视频:CAS单点登录(第7版) 票务 概述 票务 有两个核心的可配置工单组件: TicketRegistry - 提供持久票证存储。ExpirationPolicy - 提供票证过期语义的策略框架。 工单注册 部署环境和技术专业知识…...

大语言模型入门

大语言模型入门 1 大语言模型步骤1.1 pre-training 预训练1.1.1 从网上爬数据1.1.2 tokenization1.1.2.1 tokenization using byte pair encoding 1.3 预训练1.3.1 context1.3.2 training1.3.3 输出 1.2 post-training1.2.1 token 1.2 SFT监督微调1.3 人类反馈强化学习1.3.1 人…...

从ARM官方获取自己想要的gcc交叉编译工具链接(Arm GNU Toolchain),并在Ubuntu系统中进行配置

前言 本文是博文 https://blog.csdn.net/wenhao_ir/article/details/145547974 的分支博文。 在本博文中我们完成gcc交叉编译工具gcc-arm-9.2-2019.12-x86_64-arm-none-linux-gnueabihf.tar.xz的下载、配置、测试。 下载自己想要的gcc交叉编译工具的源码 目标文件的名字及说…...

LDR6500:重塑充电与数据传输的新篇章

在当今快速发展的数字时代,电子设备对充电速度、数据传输效率和兼容性提出了更高要求。LDR6500,作为一款专为USB Type-C Bridge设备设计的USB-C DRP(Dual Role Port,双角色端口)接口USB PD(Power Delivery&…...

Matlab 机器人 雅可比矩阵

工业机器人运动学与Matlab正逆解算法学习笔记(用心总结一文全会)(四)——雅可比矩阵_staubli机器人正逆向运动学实例验证matlab-CSDN博客 matlab求雅可比矩阵_六轴机械臂 矢量积法求解雅可比矩阵-CSDN博客 (63 封私信 / 80 条消息…...

网络安全防护:开源WAF雷池SafeLine本地部署与配置全流程

文章目录 前言1.关于SafeLine2.安装Docker3.本地部署SafeLine4.使用SafeLine5.cpolar内网穿透工具安装6.创建远程连接公网地址7.固定Uptime Kuma公网地址 前言 对于建站新手来说,无论你选择创建的是个人博客、企业官网还是各类应用平台来推广自己的内容或是产品&am…...

vue框架生命周期详细解析

Vue.js 的生命周期钩子函数是理解 Vue 组件行为的关键。每个 Vue 实例在创建、更新和销毁过程中都会经历一系列的生命周期阶段,每个阶段都有对应的钩子函数,开发者可以在这些钩子函数中执行特定的操作。 Vue 生命周期概述 Vue 的生命周期可以分为以下几…...