kafka生产端之架构及工作原理
文章目录
- 整体架构
- 元数据更新
整体架构
消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构,如图所示。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send0方法调用要么被阻塞,要么抛出异常,这个取决于参数max.b1ock.ms的配置,此参数的默认值为60000,即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧漆。与此同时,将较小的ProducerRecord拼漆成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。
ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque>的保存形式转变成<Node,ListProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这单需要做一个应用逻辑层面到网络IO层面的转换。
在转换成<Node,ListProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
元数据更新
上面提及的InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于图中的InFlightRequests来说,图中展示了三个节点Node0、Node1和Node2,很明显Node1的负载最小。也就是说,Node1为当前的leastLoadedNodec选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接,最终才能将消息发送到Kafka,在这一过程中所需要的信息都属于元数据信息。
在上面的讲解中我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。
元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。
相关文章:
kafka生产端之架构及工作原理
文章目录 整体架构元数据更新 整体架构 消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会…...
DeepSeek结合Langchain的基本用法
DeepSeek结合Langchain的基本用法 DeepSeek 基于Openai接口规范的Prompt应答Deepseek结合LangchainDeepSeek 基于langchain的结构化返回 DeepSeek 基于Openai接口规范的Prompt应答 首先我们需要先基于pip 安装 pip install openai最开始我们先熟悉如何使用openai的接口规范&a…...
Python与java的区别
一开始接触Python的时候,哔哩视频铺天盖地,看了很多人主讲的,要找适合自己口味的,各种培训机构喜欢在各种平台引流打广告,看了很多家,要么就是一个视频几个小时,长篇大论不讲原理只讲应用&#…...
win10 llamafactory模型微调相关① || Ollama运行微调模型
目录 微调相关 1.微调结果评估 2.模型下载到本地 导出转换,Ollama运行 1.模型转换(非常好的教程!) 2.Ollama 加载GGUF模型文件 微调相关 1.微调结果评估 【06】LLaMA-Factory微调大模型——微调模型评估_llamafactory评估-C…...
全国路网矢量shp数据(分不同类型分省份)
科研练习数据 全国路网矢量shp数据(分不同类型分省份) 有需要的自取 数据格式:shp(线) 数据包含类型:城市主干道、城市次干道、城市快速路、城市支路、高速公路、内部道路、人行道、乡村道路、自行车道路…...
RocketMq之Broker注册流程详解
1.前言 前面我也是写过一些关于broker注册到NameServer里的代码分析,但是总感觉写的比较简单,今天这篇的话,算是重新梳理一篇broker注册到NameServer中的代码,感兴趣的可以看下我前面写的几篇博客: 1.NameServer的主…...
关于精度话题的杂谈
“ 浮点值的存储、运算都可能会带来精度损失,了解精度损失背后的机制原因方便我们更好的了解什么情况下会发生精度损失、什么情况下精度损失较大,以及思考怎么避免或减少精度损失。” 01 杂谈 之前在CSDN上写过《关于float浮点值二进制存储和运算精度损失…...
AD域控粗略了解
一、前提 转眼大四,目前已入职上饶一公司从事运维工程师,这与我之前干的开发有着很大的差异,也学习到了许多新的知识。今天就写下我对于运维工作中常用的功能——域控的理解。 二、为什么要有域控,即域控的作用 首先我们必须要…...
emlog最新跨站脚本漏洞(CNVD-2025-01607、CVE-2024-13140)
EMLOG是一款轻量级开源博客和CMS建站系统,速度快、省资源、易上手,适合各种规模的站点搭建,基于PHPMySQL开发。 国家信息安全漏洞共享平台于2025-01-16公布该程序存在跨站脚本漏洞。 漏洞编号:CNVD-2025-01607、CVE-2024-13140 …...
DeepSeek-r1和O1、O3mini谁更强?
DeepSeek-r1和O1、O3mini谁更强? 题目:编写一个 js 程序,显示一个球在旋转的六边形内弹跳。球应该受到重力和摩擦力的影响,并且必须逼真地从旋转的墙壁上弹起 DeepSeek-r1 <!DOCTYPE html> <html> <body> &l…...
代码随想录_二叉树
二叉树 二叉树的递归遍历 144.二叉树的前序遍历145.二叉树的后序遍历94.二叉树的中序遍历 // 前序遍历递归LC144_二叉树的前序遍历 class Solution {public List<Integer> preorderTraversal(TreeNode root) {List<Integer> result new ArrayList<Integer&g…...
时序数据库:Influxdb详解
文章目录 一、简介1、简介2、官网 二、部署1、安装2、配置(1)用户初始化 三、入门(Web UI)1、加载数据(1)上传数据文件(2)代码接入模板 2、管理存储桶(1)创建…...
内存泄漏及检测办法
什么情况下会产生内存泄漏? 内存泄漏如何检测? 使用 valgrind 对象计数 基本思路: 在对象的构造函数中增加计数:每次创建一个对象时,增加一个计数。在对象的析构函数中减少计数:每次销毁一个对象时&…...
BiGRU双向门控循环单元多变量多步预测,光伏功率预测(Matlab完整源码和数据)
代码地址:BiGRU双向门控循环单元多变量多步预测,光伏功率预测(Matlab完整源码和数据) BiGRU双向门控循环单元多变量多步预测,光伏功率预测 一、引言 1.1、研究背景和意义 随着全球对可再生能源需求的不断增长,光伏…...
Leetcode 3448. Count Substrings Divisible By Last Digit
Leetcode 3448. Count Substrings Divisible By Last Digit 1. 解题思路2. 代码实现 题目链接:3448. Count Substrings Divisible By Last Digit 1. 解题思路 这一题的话我们走的是一个累积数组的思路。 首先,我们使用一个cache数组记录下任意段数字…...
青少年编程与数学 02-009 Django 5 Web 编程 03课题、项目结构
青少年编程与数学 02-009 Django 5 Web 编程 03课题、项目结构 一、项目结构项目根目录应用目录其他目录 二、项目设置Django 插件设置项目配置环境变量设置项目目录标记版本控制 三、Django 插件安装 Django 插件配置 Django 插件使用 Django 插件功能 四、扩展插件开发效率插…...
【玩转 Postman 接口测试与开发2_018】第14章:利用 Postman 初探 API 安全测试
《API Testing and Development with Postman》最新第二版封面 文章目录 第十四章 API 安全测试1 OWASP API 安全清单1.1 相关背景1.2 OWASP API 安全清单1.3 认证与授权1.4 破防的对象级授权(Broken object-level authorization)1.5 破防的属性级授权&a…...
UA-Track:不确定性感知端到端3D多目标跟踪
论文地址:https://arxiv.org/pdf/2406.02147 主页:https://liautoad.github.io/ua-track-website/ 3D多目标跟踪(MOT)在自动驾驶感知中起着至关重要的作用。最近基于端到端查询的跟踪器可以同时检测和跟踪对象,这在3D …...
Windows下AMD显卡在本地运行大语言模型(deepseek-r1)
Windows下AMD显卡在本地运行大语言模型 本人电脑配置第一步先在官网确认自己的 AMD 显卡是否支持 ROCm下载Ollama安装程序模型下载位置更改下载 ROCmLibs先确认自己显卡的gfx型号下载解压 替换替换rocblas.dll替换library文件夹下的所有 重启Ollama下载模型运行效果 本人电脑配…...
萌新学 Python 之字符串及字符串相关函数
字符串:单引号、双引号、三个单引号、三个双引号 字符串属于不可变的数据类型,一旦被定义,内存地址不变 name 张三 # 字符串赋值给name后,内存地址存储张三,地址不变 username 张三 # 张三去内存中找…...
【鸿蒙开发】第二十四章 AI - Core Speech Kit(基础语音服务)
目录 1 简介 1.1 场景介绍 1.2 约束与限制 2 文本转语音 2.1 场景介绍 2.2 约束与限制 2.3 开发步骤 2.4 设置播报策略 2.4.1 设置单词播报方式 2.4.2 设置数字播报策略 2.4.3 插入静音停顿 2.4.4 指定汉字发音 2.5 开发实例 3 语音识别 3.1 场景介绍 3.2 约束…...
Java | RESTful 接口规范
关注:CodingTechWork 引言 作为一名程序员,制定清晰、一致且高效的 RESTful 接口规范对于团队的开发效率和项目的长期维护至关重要。本文将详细介绍 RESTful 接口的设计理念、请求方法分类、核心规范,以及正确和错误的示例,帮助团…...
shell脚本控制——处理信号
Linux利用信号与系统中的进程进行通信。你可以通过对脚本进行编程,使其在收到特定信号时执行某些命令,从而控制shell脚本的操作。 1.重温Linux信号 Linux系统和应用程序可以产生超过30个信号。下表列出了在shell脚本编程时会遇到的最常见的Linux系统信…...
OpenGL学习笔记(十二):初级光照:投光物/多光源(平行光、点光源、聚光)
文章目录 平行光点光源聚光多光源 现实世界中,我们有很多种类的光照,每种的表现都不同。将光投射(Cast)到物体的光源叫做投光物(Light Caster)。 平行光/定向光(Directional Light)点光源(Point Light)聚光(Spotlight) 平行光 当一个光源处于很远的地…...
redis底层数据结构——整数集合
文章目录 定义内部实现升级升级的好处提升灵活性节约内存 降级总结 定义 整数集合(intset)是集合键的底层实现之一,当一个集合只包含整数值元素,并且这个集合的元素数量不多时,Redis就会使用整数集合作为集合键的底层…...
w198基于Springboot的智能家居系统
🙊作者简介:多年一线开发工作经验,原创团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹赠送计算机毕业设计600个选题excel文…...
【JavaScript】《JavaScript高级程序设计 (第4版) 》笔记-Chapter2-HTML 中的 JavaScript
二、HTML 中的 JavaScript 将 JavaScript 插入 HTML 的主要方法是使用<script>元素。 <script>元素有下列 8 个属性。 async:可选。表示应该立即开始下载脚本,但不能阻止其他页面动作,比如下载资源或等待其他脚本加载。只对外部…...
【5】阿里面试题整理
[1]. 介绍一下ZooKeeper ZooKeeper是一个开源的分布式协调服务,核心功能是通过树形数据模型(ZNode)和Watch机制,解决分布式系统的一致性问题。 它使用ZAB协议保障数据一致性,典型场景包括分布式锁、配置管理和服务注…...
系统思考—自我超越
“人们往往认为是个人的能力限制了他们,但事实上,是组织的结构和惯性思维限制了他们的潜力。”—彼得圣吉 最近和一家行业隐形冠军交流,他们已经是领域第一,老板却依然要求:核心团队都要自我超越,攻坚克难…...
大模型-ALIGN 详细介绍
ALIGN模型(A Large-scale ImaGe and Noisy-text embedding)是一种大规模图像和噪声文本嵌入模型,它通过对比学习的方式将图像和文本嵌入到同一个向量空间中,使得匹配的图像-文本对的嵌入向量接近,不匹配的则远离。这种…...
【C++高并发服务器WebServer】-15:poll、epoll详解及实现
本文目录 一、poll二、epoll2.1 相对poll和select的优点2.2 epoll的api2.3 epoll的demo实现2.5 epoll的工作模式 一、poll poll是对select的一个改进,我们先来看看select的缺点。 我们来看看poll的实现。 struct pollfd {int fd; /* 委托内核检测的文件描述符 */s…...
【算法】动态规划专题⑩ —— 混合背包问题 python
目录 前置知识进入正题总结 前置知识 【算法】动态规划专题⑤ —— 0-1背包问题 滚动数组优化 【算法】动态规划专题⑥ —— 完全背包问题 python 【算法】动态规划专题⑦ —— 多重背包问题 二进制分解优化 python 混合背包结合了三种不同类型的背包问题:0/1背包…...
Java高频面试之SE-20
hello啊,各位观众姥爷们!!!本baby今天又来了!哈哈哈哈哈嗝🐶 Java的泛型是什么? Java 泛型(Generics) 是 Java 5 引入的一项重要特性,用于增强代码的类型安…...
springboot 事务管理
在Spring Boot中,事务管理是通过Spring框架的事务管理模块来实现的。Spring提供了声明式事务管理和编程式事务管理两种方式。通常,我们使用声明式事务管理,因为它更简洁且易于维护。 1. 声明式事务管理 声明式事务管理是通过注解来实现的。…...
opentelemetry-collector 配置elasticsearch
一、修改otelcol-config.yaml receivers:otlp:protocols:grpc:endpoint: 0.0.0.0:4317http:endpoint: 0.0.0.0:4318 exporters:debug:verbosity: detailedotlp/jaeger: # Jaeger supports OTLP directlyendpoint: 192.168.31.161:4317tls:insecure: trueotlphttp/prometheus: …...
IDEA关联Tomcat,部署JavaWeb项目
将IDEA与Tomcat关联 创建JavaWeb项目 创建Demo项目 将Tomcat作为依赖引入到Demo中 添加 Web Application 编写前端和后端代码 配置Tomcat server,并运行...
位图与位运算的深度联系:从图像处理到高效数据结构的C++实现与优化
在学习优选算法课程的时候,博主学习位运算了解到位运算的这个概念,之前没有接触过,就查找了相关的资料,丰富一下自身,当作课外知识来了解一下。 位图(Bitmap): 在计算机科学中有两种…...
运维_Mac环境单体服务Docker部署实战手册
Docker部署 本小节,讲解如何将前端 后端项目,使用 Docker 容器,部署到 dev 开发环境下的一台 Mac 电脑上。 1 环境准备 需要安装如下环境: Docker:容器MySQL:数据库Redis:缓存Nginx&#x…...
DeepSeek-V3 论文解读:大语言模型领域的创新先锋与性能强者
论文链接:DeepSeek-V3 Technical Report 目录 一、引言二、模型架构:创新驱动性能提升(一)基本架构(Basic Architecture)(二)多令牌预测(Multi-Token Prediction…...
react使用if判断
1、第一种 function Dade(req:any){console.log(req)if(req.data.id 1){return <span>66666</span>}return <span style{{color:"red"}}>8888</span>}2、使用 {win.map((req,index) > ( <> <Dade data{req}/>{req.id 1 ?…...
opencv:基于暗通道先验(DCP)的内窥镜图像去雾
目录 项目大体情况 暗通道先验(Dark Channel Prior, DCP)原理 项目代码解析 该项目是由我和我导师与舟山某医院合作开发的一个基于暗通道先验(Dark Channel Prior,DCP)的内窥镜图像去雾方法。具体来说,…...
2025年物联网相关专业毕业论文选题参考,文末联系,选题相关资料提供
一、智能穿戴解决方案研究方向 序号解决方案论文选题论文研究方向1智能腰带健康监测基于SpringBoot和Vue的智能腰带健康监测数据可视化平台开发研究如何利用SpringBoot和Vue技术栈开发一个数据可视化平台,用于展示智能腰带健康监测采集的数据,如心率、血…...
npm无法加载文件 因为此系统禁止运行脚本
安装nodejs后遇到问题: 在项目里【node -v】可以打印出来,【npm -v】打印不出来,显示npm无法加载文件 因为此系统禁止运行脚本。 但是在winr,cmd里【node -v】,【npm -v】都也可打印出来。 解决方法: cmd里可以打印出…...
使用PyCharm创建项目以及如何注释代码
创建好项目后会出现如下图所示的画面,我们可以通过在项目文件夹上点击鼠标右键,选择“New”菜单下的“Python File”来创建一个 Python 文件,在给文件命名时建议使用英文字母和下划线的组合,创建好的 Python 文件会自动打开&#…...
Qt中QFile文件读写操作和QFileInfo文件信息读取方法(详细图文教程)
💪 图像算法工程师,专业从事且热爱图像处理,图像处理专栏更新如下👇: 📝《图像去噪》 📝《超分辨率重建》 📝《语义分割》 📝《风格迁移》 📝《目标检测》 &a…...
CF998A Balloons 构造
Balloons 算法:构造 rating : 1000 思路: 分情况讨论: 1. 当只有一个气球包时,肯定不行 2.当有两个气球包时,若两个气球包的气球个数相同则不行 3.其余的情况都是可以的,题目问给格里高利的气球包数…...
python基础入门:3.5实战:词频统计工具
Python词频统计终极指南:字典与排序的完美结合 import re from collections import defaultdictdef word_frequency_analysis(file_path, top_n10):"""完整的词频统计解决方案:param file_path: 文本文件路径:param top_n: 显示前N个高频词:return:…...
面试准备——Java理论高级【笔试,面试的核心重点】
集合框架 Java集合框架是面试中的重中之重,尤其是对List、Set、Map的实现类及其底层原理的考察。 1. List ArrayList: 底层是动态数组,支持随机访问(通过索引),时间复杂度为O(1)。插入和删除元素时&#…...
Docker 部署 verdaccio 搭建 npm 私服
一、镜像获取 # 获取 verdaccio 镜像 docker pull verdaccio/verdaccio 二、修改配置文件 cd /wwwroot/opt/docker/verdaccio/conf vim config.yaml config.yaml 配置文件如下,可以根据自己的需要进行修改 # # This is the default configuration file. It all…...
每日一题--数组中只出现一次的两个数字
数组中只出现一次的两个数字 题目描述数据范围提示 示例示例1示例2 题解解题思路位运算方法步骤: 代码实现代码解析时间与空间复杂度按位与操作获取最小位1的原理为什么选择最低有效的 1 位而不是其他位? 题目描述 一个整型数组里除了两个数字只出现一次…...