当前位置: 首页 > news >正文

kafka消费的模式及消息积压处理方案

目录

1、kafka消费的流程

2、kafka的消费模式

2.1、点对点模式

2.2、发布-订阅模式

3、consumer消息积压

3.1、处理方案

3.2、积压量

4、消息过期失效

5、kafka注意事项

        Kafka消费积压(Consumer Lag)是指消费者处理消息的速度跟不上生产者发送消息的速度,导致消息在Kafka主题中堆积。

关于kakfa的架构图,如下所示:

更多关于kafka的介绍,参考:关于MQ之kafka的深入研究-CSDN博客https://blog.csdn.net/weixin_50055999/article/details/148535599?spm=1011.2415.3001.5331


1、kafka消费的流程

        之前的章节中,介绍了kafka消息由producer通过hash函数存放到broker节点后,每个broker节点由多个topic主题组成,可水平扩展。

        每个topic由多个partitin组成,partition里面的内容有顺序,跨partition无序。

对于点对点模式下:

        消费组内每个消费者可以消费多个partition、同时保留offset偏移位置,保证下次消费。

对于发布订阅模式

        不同消费组内的消费者可以消费同一个patition,两个消费组不受影响,各自保留彼此的offset的偏移位置。

如图所示:

在消费者消费过程的流程如下:

由上图可知:

1、每个topic里面包含多个partition。

2、每个partition里面的内容是按顺序分布的。

3、每个消费者可以消费多个partition。

4、而partition只能被一个消费者消费。

对于不同消费者组,可以共同消费同一个topic里面的消息。


2、kafka的消费模式

Kafka 的消费订阅模式取决于消费者组的配置方式,可以分为以下两种主要模式:

2.1、点对点模式

特点:一条消息只能被一个消费者消费

实现方式

  • 所有消费者属于同一个消费者组(相同的 group.id

  • Kafka 会在组内消费者之间自动平衡分区分配

// 消费者1和消费者2使用相同的group.id
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer-group"); // 相同的组ID
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

工作流程

  1. 假设主题有3个分区(P0, P1, P2)

  2. 如果有1个消费者,它将消费所有3个分区

  3. 如果增加第二个消费者,Kafka会重新平衡:

    • 消费者1可能获得P0和P1

    • 消费者2获得P2

  4. 消息在每个分区内有序,且只被分配给该分区的消费者消费

2.2、发布-订阅模式

特点一条消息可以被多个消费者(不同消费组)消费(本质还是点对点)

实现方式

  • 不同消费者组订阅同一个主题

  • 每个消费者组都会收到完整的消息流

// 组A的消费者
Properties propsA = new Properties();
propsA.put("group.id", "group-a"); // 不同组ID
// ...其他配置
KafkaConsumer<String, String> consumerA = new KafkaConsumer<>(propsA);// 组B的消费者
Properties propsB = new Properties();
propsB.put("group.id", "group-b"); // 不同组ID
// ...其他配置
KafkaConsumer<String, String> consumerB = new KafkaConsumer<>(propsB);

工作流程

  1. 生产者发送消息到主题

  2. 组A的所有消费者(作为一个组)会收到消息的一个副本

  3. 组B的所有消费者(作为另一个独立的组)也会收到消息的一个副本

  4. 在每个组内部,消息仍然遵循点对点模式(组内只有一个消费者收到)


3、consumer消息积压

        Kafka消息积压的问题,核心原因是生产太快、消费太慢,处理速度长期失衡,从而导致消息积压(Lag)的场景,积压到超过队列长度限制,就会出现还未被消费的数据产生丢失的场景。
       如果长时间不解决消息积压,可能会引发资源紧张服务延迟崩溃等问题。解决消息积压的关键是提高消费者的消费能力,并优化Kafka集群的整体处理效率。

3.1、处理方案

1. 如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数(提高kafka的并行度)同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)

2. 若是下游数据处理不及时,则提高每批次拉取的数量。批次拉取数量过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

方法:

1. 增大partion数量。
2. 消费者加了并发,服务, 扩大消费线程。
3. 增加消费组服务数量。
4. kafka单机升级成了集群。
5. 避免消费者消费消息时间过长,导致超时。
6. 使Kafka分区之间的数据均匀分布。

3.2、积压量

  • 生产量:Kafka Topic 在一个时间周期内各partition offset 起止时间差值之和。
  • 消费量:Kafka Topic 在一个时间周期内某个消费者的消费量。
  • 积压量:Kafka Topic 的某个Consumer Group残留在消息中间件未被及时消费的消息量。

4、消息过期失效

        产生消息堆积,消费不及时,kafka数据有过期时间,一些数据就丢失了,主要是消费不及时。

当出现这种现象的时候,可参考以下经验,进行规避:

1. 消费kafka消息时,应该尽量减少每次消费时间,可通过减少调用三方接口、读库等操作,
   从而减少消息堆积的可能性。
2. 如果消息来不及消费,可以先存在数据库中,然后逐条消费(可以保存消费记录,方便定位问题)。
3. 每次接受kafka消息时,先打印出日志,包括消息产生的时间戳。
4. kafka消息保留时间(修改kafka配置文件, 默认一周)
5. 任务启动从上次提交offset处开始消费处理


5、kafka注意事项

1. 由于Kafka消息key设置,在Kafka producer处,给key加随机后缀,使其均衡。
 
2. 数据量很大,合理的增加Kafka分区数是关键。
   Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,
   会影响Kafka consumer消费的吞吐量. 如果利用的是Spark流和Kafka direct approach方式,
   也可以对KafkaRDD进行repartition重分区,增加并行度处理.


参考文章:

1、Kafka如何处理大量积压消息_kafka消息堆积过多了怎么办-CSDN博客https://blog.csdn.net/AlbenXie/article/details/128300018?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522dcefb6fbf11572c5ef4526b40c68a37c%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=dcefb6fbf11572c5ef4526b40c68a37c&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_click~default-1-128300018-null-null.142^v102^pc_search_result_base1&utm_term=kafka%E6%B6%88%E6%81%AF%E7%A7%AF%E5%8E%8B%E6%80%8E%E4%B9%88%E5%A4%84%E7%90%86&spm=1018.2226.3001.4187

相关文章:

kafka消费的模式及消息积压处理方案

目录 1、kafka消费的流程 2、kafka的消费模式 2.1、点对点模式 2.2、发布-订阅模式 3、consumer消息积压 3.1、处理方案 3.2、积压量 4、消息过期失效 5、kafka注意事项 Kafka消费积压(Consumer Lag)是指消费者处理消息的速度跟不上生产者发送消息的速度&#xff0c;导致消息在…...

基于多模态文档解析与RAG的行业知识库构建技术指南

1. 技术背景 随着企业非结构化数据&#xff08;扫描件、PDF、图像等&#xff09;占比超过80%&#xff0c;传统关键词检索已无法满足精准问答需求。本文提出融合**计算机视觉&#xff08;CV&#xff09;与大语言模型&#xff08;LLM&#xff09;**的解决方案&#xff0c;关键技…...

UVa1408/LA4018 Flight Control

UVa1408/LA4018 Flight Control 题目链接题意分析AC 代码 题目链接 本题是2007年icpc亚洲区域赛成都赛区的F题 题意 有一个N行M列的数组(1 ≤ N ≤ 50, 1 ≤ M ≤ 9)记录机场各个航班的飞行传感数据&#xff0c;其每个元素都是整数。如果某元素小于等于0&#xff0c;则其一定不…...

【STM32 HAL库】使用HAL库操作FLASH

操作顺序 先解锁Flash&#xff0c;再擦除片区&#xff0c;再写入&#xff0c;写完了别忘了加锁。 HAL_FLASH_Unlock(); HAL_FLASHEx_Erase(); HAL_FLASH_Program(); HAL_FLASH_Lock();擦除操作 首先有个问题&#xff0c;我们为什么要擦除&#xff0c;不能直接覆写吗&#xf…...

【学习笔记】2.2 Encoder-Decoder

参考资料&#xff1a;https://github.com/datawhalechina/happy-llm 在 Transformer 中&#xff0c;使用注意力机制的是其两个核心组件——Encoder&#xff08;编码器&#xff09;和 Decoder&#xff08;解码器&#xff09;。 2.2.1 Seq2Seq 模型 Seq2Seq&#xff08;序列到…...

批量创建tmux tmux批量

目录 获取空进程 tmux 获取空进程tmux并关闭 批量创建tmux 批量创建tmux 设置CUDA_VISIBLE_DEVICES: python 读取CUDA_VISIBLE_DEVICES 获取空进程 tmux for session in $(tmux ls -F #S); dopid=$(tmux list-panes -t "$session" -F "#{pane_pid}"…...

7.索引库操作

mapping映射属性 mapping是对索引库中文档的约束 常见的mapping属性包括&#xff1a; type 字段数据类型&#xff0c;常见的简单类型有&#xff1a; 字符串&#xff1a;text(可分词的文本) keyword(精确值&#xff0c;例如&#xff1a;品牌、国家&#xff0c;ip地址) 数值&…...

Transformer-BiGRU、Transformer、CNN-BiGRU、BiGRU、CNN五模型多变量时序预测

Transformer-BiGRU、Transformer、CNN-BiGRU、BiGRU、CNN五模型多变量时序预测 目录 Transformer-BiGRU、Transformer、CNN-BiGRU、BiGRU、CNN五模型多变量时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Transformer-BiGRU、Transformer、CNN-BiGRU、BiGRU、CN…...

ROS2 笔记汇总(2) 通信接口

在 ROS 系统中&#xff0c;通信接口&#xff08;Interface&#xff09; 是节点之间传递信息的标准“语言协议”&#xff0c;确保了不同功能节点之间可以正确理解和使用彼此传送的数据内容。我们可以将其理解为“数据结构格式定义”&#xff0c;贯穿于话题&#xff08;Topic&…...

更新已打包好的 Spring Boot JAR 文件中的 class 文件

# 1. 解压原始 JAR unzip -q original-app.jar -d temp # 2. 替换 class 文件 cp ~/projects/new-classes/*.class temp/BOOT-INF/classes/com/example/ # 3. 保留原始清单 cp temp/META-INF/MANIFEST.MF . # 4. 重新打包 jar -cf0m new-app.jar MANIFEST.MF -C temp/ . # …...

matlab实现求解兰伯特问题

求解兰伯特问题的matlab代码&#xff0c;非常好用 solve_lambertLYP.m , 1899 StumpffC.m , 136 StumpffdF.m , 294 StumpffF.m , 151 StumpffS.m , 167 Stumpffy.m , 96 text2.m , 104...

英福康INFICON VGC501, VGC502, VGC503 单通道、双通道和三通道测量装置

英福康INFICON VGC501, VGC502, VGC503 单通道、双通道和三通道测量装置...

解决IDEA插件使用Lombok找不到符号问题

https://juejin.cn/post/7013998800842784782 -Djps.track.ap.dependenciesfalse...

ULVAC DC-10-4P 400V input 10kW DC Pulse power supply 爱发科直流电源

ULVAC DC-10-4P 400V input 10kW DC Pulse power supply 爱发科直流电源...

pip安装python第三方库报错

ERROR: Could not install packages due to an OSError: [WinError 32] 另一个程序正在使用此文件&#xff0c;进程无法访问。: C:\\Users\\Lenovo\\AppData\\Local\\Temp\\pip-unpack-9i5hs6ml\\tensorflow-2.10.1-cp310-cp310-win_amd64.whl Consider using the --user optio…...

湖南大学CS-2024期末考试解析

【前言】 这是一张引流贴&#xff0c;标准答案跳转至23级同学的博客。 但需要指出&#xff0c;本人没来得及校准答案。 感谢23级同学做出的开源贡献。 【参考答案】 湖南大学CS-2024期末考试解析-CSDN博客...

SpringBoot-Thymeleaf

大佬写的真好&#xff1a;Thymeleaf一篇就够了-阿里云开发者社区...

Docker镜像之windows系统

https://github.com/dockur/windows 在 Docker 容器中运行 Windows 功能 ISO 下载器KVM 加速基于网页的查看器 使用方法 启动容器并通过浏览器连接到端口 8006。整个安装过程将全自动完成&#xff0c;无需手动干预。当桌面界面出现时&#xff0c;表示 Windows 安装已完成&a…...

学到新的日志方法mp

使用mp技术的时候可以在类上加上注解Slf4j 就可以使用日志 不需要在定义变量log,注意日志只能在方法内使用&#xff0c;不能在方法外进行使用...

PythonWeb项目开发脚手架

项目技术选型 1、FastAPI python web开发框架 2、SQLAlchemy ORM框架 (MySQL) 3、Dynaconf 配置管理 4、JWT Passlib(hash加密) 码云地址&#xff1a; pyhappy: Python Web 项目开发脚手架...

相机--相机标定

教程 相机标定分类 相机标定分为内参标定和外参标定。 内参标定 目的 作用 原理 外参标定...

IoTGateway项目生成Api并通过swagger和Postman调用

IoTGateway项目生成Api并通过swagger和Postman调用-CSDN博客...

vscode code runner 使用python虚拟环境

转载如下&#xff1a; z​​​​​​​VS Code插件Code Runner使用python虚拟环境_coderunner python-CSDN博客...

IEE754标准,double和int转换,在线计算器

1.在线计算器 在线进制转换-IEE754浮点数16进制转换 2.标准解释 西门子PLC接收的ModbusRTU数据帧中IEEE754 格式4字节数据转为浮点数转换程序_西门子modbus读取32位浮点数-CSDN博客 浮点数表示&#xff08;IEEE 754&#xff09;_浮点数举个例子-CSDN博客 IEEE754 浮点数&a…...

语音转文字工具

平时工作和学习比较忙&#xff0c;可能没时间听讲座&#xff0c;只能看回放&#xff0c;回访也很长&#xff0c;这时&#xff0c;我们可以借助语言转文字&#xff0c;通过阅读文字快速了解讲座的重点&#xff0c;今天给大家分享一个本人经常用的语言转文字工具&#xff0c;改工…...

微前端之micro-app数据通信

在这之前如果还没接触过微前端,可以找一些视频、资料先去了解一下,就不在这里赘述了。 现在常见的微前端框架包括: single-spa micro-app qiankun EMP 无界 目前了解到的基本上是这些哈,大家感兴趣可以自行去了解一下,看下它们之间的区别。 因为我目前使用的是mic…...

【代码坏味道】无用物Dispensables

&#x1f4ac; Comments&#xff08;注释过多&#xff09; &#x1f9fe; 症状 方法中充满了解释性注释。 &#x1f9e0; 问题原因 作者意识到代码不易懂&#xff0c;靠注释来“掩盖”结构不清的问题。 &#x1f6e0;️ 应对方法 用好名字代替注释&#xff1a;好名字胜过…...

C++ 观察者模式:设计与实现详解

一、引言 在现代软件开发中,组件间的交互与通信是系统设计的核心挑战之一。观察者模式(Observer Pattern)作为一种行为设计模式,提供了一种优雅的解决方案,用于实现对象间的一对多依赖关系。本文将深入探讨 C++ 中观察者模式的设计理念、实现方式及其应用场景。 二、观察…...

C++ 17 正则表达式

正则表达式不是C语言的一部分&#xff0c;这里仅做简单的介绍。 将这项技术引进&#xff0c;在 』的讨论 正则表达式描述了一种字符串匹配的模式。一般使用正则表达式主要是实现下面三个需求&#xff1a; 1,检查一个串是否包含某种形式的子串&#xff1b; 2,将匹配的子串替换&a…...

[Windows] 剪映 视频编辑处理

附链接&#xff1a;夸克网盘分享&#xff08;点击蓝色字体自行保存下载&#xff09;...

docker安装和镜像源替换

这个博主的方法很好&#xff1a;Docs...

MAC软件游戏打开提示已损坏

打开「终端.app」&#xff0c;输入以下命令并回车&#xff0c;输入开机密码回车 sudo spctl --master-disable 按照上述步骤操作完成后&#xff0c;打开「系统偏好设置」-「安全与隐私」-「通用」&#xff0c;确保已经修改为「任何来源」。 打开「终端.app」&#xff0c;输入…...

数据库概念

1. 数据库核心组成&#xff1a;包括数据集合(DB)、管理系统(DBMS)和完整系统(DBS)&#xff0c;具有共享性、独立性、低冗余、一致性、完整性和安全性等特点。 2. 关系型数据库基础&#xff1a;采用二维表结构存储数据&#xff0c;核心概念包括表、行(元组)、列(属性)、主键(唯…...

每日Prompt:指尖做画

提示词 微缩景观&#xff0c;微距摄影&#xff0c;俯瞰角度&#xff0c;特写&#xff0c;硕大食指手指甲&#xff0c;一个小小的人正在做画&#xff0c;小人右手拿画笔&#xff0c;小人左手拿调色盘&#xff0c;在指甲上作画&#xff0c;画的是中国古代山水画&#xff0c;背景…...

线程池的详细知识(含有工厂模式)

前言 下午学习了线程池的知识。重点探究了ThreadPoolExecutor里面的各种参数的含义。我详细了解了这部分的知识。其中有一个参数涉及工厂模式&#xff0c;我将这一部分知识分享给大家~ 线程池的详细介绍(含工厂模式) 结语 分享到此结束啦。byebye~...

【c语言输入不大于26的整数,输出全部大写字母输入3输出ABC】2022-1-30

缘由c语言简单运用&#xff0c;越简单越好-编程语言-CSDN问答 int x 0, n 0; scanf_s("%d", &n); //std::cin >> n;while (x<n)printf_s("%c", (char)(A x)), x; // std::cout << (char)(A x), x;...

91.评论日记

2025年5月30日20:27:06 AI画减速器图纸&#xff1f; 呜呜为什么读到机械博士毕业了才有啊 | 新迪数字2025新品发布会 | AI工业软件 | 三维CAD | 国产自主_哔哩哔哩_bilibili...

redis持久化策略

RDB 是通过生成数据快照来实现持久化的&#xff0c;相当于给内存中的数据拍一张"照片"保存到磁盘上。AOF 记录所有写操作命令&#xff0c;以Redis协议格式追加到文件末尾。 RDB 在满足特定条件时触发内存快照&#xff0c;生成新的RDB文件替换旧文件 AOF 先写入内…...

gitLab 切换中文模式

点击【头像】--选择settings 选择【language】,选择中文&#xff0c;点击【保存】即可。...

VScode ios 模拟器安装cocoapods

使用 Homebrew 安装&#xff08;推荐&#xff09; 如果你有 Homebrew&#xff0c;直接用它安装更稳定&#xff1a; brew install cocoapods...

什么是Docker容器?

什么是Docker&#xff1f;看这一篇干货文章就够了&#xff01; - 知乎 直接上链接&#xff08;感谢小灰老师&#xff01;&#xff09; 后续可能会补充菜狗的自我见解&#xff08;太菜了&#xff0c;要慢慢学&#xff01;&#xff09;...

相机--RGBD相机

教程 分类原理和标定 RGBD相机RGB相机深度&#xff1b;...

阻塞队列的学习以及模拟实现一个阻塞队列

前言 今天上午学习了阻塞队列。之前在数据结构的时候&#xff0c;学过队列。把队列放在多线程中&#xff0c;对队列会有新的体会。我自己也实现了一个阻塞队列结合生产消费模型&#xff0c;希望对于大家有帮助~ 阻塞队列的相关知识 结语 本次的分享就结束啦。端午安康~...

wireshark分析国标rtp ps流

1.将抓到的tcp或者udp视频流使用decode as 转为rtp包 2.电话->RTP->RTP播放器 选择Export 里面的Payload 就可以导出原始PS流...

ai如何绘制mg人物眉毛

ai如何绘制mg人物眉毛 解决方法: 1、使用椭圆工具&#xff0c;画个扁扁的圆&#xff0c;长度和眉毛长度一致 2、切换到直接选择工具&#xff0c;选择椭圆底部的锚点&#xff0c;按住键盘上键往上移动&#xff0c;画出眉毛弧度 如果想更细致一点&#xff0c;比如绘制眉峰可参…...

系统安装出现的问题 老毛桃

有的电脑这样&#xff0c;不一定能进入u盘启动&#xff0c;需要再 save Exid栏目里&#xff0c;点击那个use disk2.0...

CTA-861-G-2017中文pdf版

CTA-861-G标准&#xff08;2016年11月发布&#xff09;规范未压缩高速数字接口的DTV配置&#xff0c;涵盖视频格式、色彩编码、辅助信息传输等&#xff0c;适用于DVI、HDMI等接口&#xff0c;还涉及EDID数据结构及HDR元数据等内容。...

ai如何绘制mg人物的睫毛

ai如何绘制mg人物的睫毛 解决方法: 1、先ctrlc&#xff0c;再ctrlf原地复制粘贴眼眶图层&#xff0c;复制两次&#xff08;图层1在图层2的上一层&#xff09;&#xff0c;填充颜色改成睫毛颜色&#xff0c;黑色 2、切换到选择工具&#xff0c;选中图层1&#xff0c;点击一次键…...

eNSP企业综合网络设计拓扑图

1.拓扑图 2.拓扑配置 此拓扑还有一些瑕疵&#xff0c;仅做参考和技术提升使用。 想要配置的可以关注下载 大型网络综合实验拓扑图&#xff08;eNSP&#xff09;资源-CSDN文库...

ST-GCN

1.bash 安装git 在目录下右键使用git bash打开 需要安装wgetbash download_model.sh&#xff0c;下载.sh文件 wget: command not found&#xff0c;Windows系统使用git命令 下载预训练权重_sh文件下载-CSDN博客 bash tools/get_models.sh 生成了三个.pt文件...