kafka消费堆积问题探索
背景
我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目中消费kafka中的日志,并打印到控制台,最后,统一使用阿里sls抓取日志。我们kafka的分区有12个,go程序部署在k8s集群中,开启了弹性扩缩容,最多开启了8个pod进行消费,每秒产生的日志数量高峰在1500条左右,在这种情况下,依然产生了消息的堆积。
消费中执行的逻辑只有对象的映射和日志写控制台,所以,这种情况下产生了消息堆积,令我倍感困惑。
探索之路
第一步,确认一下每一步的执行时间。
func (s KafkaLogService) ReaderCreateLog(ctx context.Context, msg *customerkafka.CustomKafkaMsg) error {now := time.Now().UnixNano()var logEntry LogEntrydata, ok := msg.Data.(string)if !ok {global.GIN_LOG.Error(ctx, "消息数据类型错误", "data", msg.Data)return fmt.Errorf("消息数据类型错误: %v", msg.Data)}// 解析 JSON 数据if err := json.Unmarshal([]byte(data), &logEntry); err != nil {global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", data)return fmt.Errorf("解析消息失败: %w", err)}now2 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************Unmarshal logEntry 耗时:%d 纳秒\n", now2-now)var logMessage LogMessageif err := json.Unmarshal([]byte(logEntry.Message), &logMessage); err != nil {global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", logEntry.Message)return fmt.Errorf("解析消息失败: %w", err)}now3 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************Unmarshal LogMessage 耗时:%d 纳秒\n", now3-now2)// 日志等级判断switch logEntry.Level {case "ERROR":global.GIN_LOG.Error(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))case "WARN":global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))case "INFO":global.GIN_LOG.Info(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))default:global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))}now4 := time.Now().UnixNano()fmt.Printf("*******************分隔符*******************log 耗时:%d 纳秒\n", now4-now3)return nil
}
json的Unmarshal的耗时,倒是符合我的认知,在预期之中。
但是,打印日志尽然需要耗时1.5毫秒,这个有点超出我的意料之外。这个时间似乎有点夸张啊。
但是,即便如此,一个消费者每秒也可以消费670条左右的消息,在起了8个实例的情况下,也不应该造成kafka消息的阻塞。
继续我们的探索之路
下面这一段是我对于kakfa消费者的封装。大概的逻辑就是每一个ActionType起一个协程进行消费。在这篇《基于kafka-go写的生产者和消费者》文章中写过这个封装背后的设计逻辑,有兴趣的可以移步过去一探究竟。
// Start 方法启动消费者并开始读取消息,根据actionType调用不同的处理函数
func (c *ConsumerClient) Start(ctx context.Context, handlers map[string]ActionHandler) error {for {select {case <-ctx.Done():return nil // 上下文取消,直接返回default:msg, err := c.reader.ReadMessage(ctx)if err != nil {c.logger.Error(ctx, "Failed to read message from Kafka", "error", err)continue}c.logger.Info(ctx, fmt.Sprintf("Message on topic: %s value: %s partion:%d offset:%d", msg.Topic, string(msg.Value), msg.Partition, msg.Offset))var kafkaMsg CustomKafkaMsgif err := json.Unmarshal(msg.Value, &kafkaMsg); err != nil {c.logger.Error(ctx, "Failed to unmarshal Kafka message", "error", err)continue}channel := make(chan *CustomKafkaMsg)// 使用 sync.Map 来管理 workerworker, loaded := c.workerMap.LoadOrStore(kafkaMsg.ActionType, channel)if !loaded {c.wg.Add(1) // 增加 WaitGroup 计数if ch, ok := worker.(chan *CustomKafkaMsg); ok {go c.startWorker(ctx, kafkaMsg.ActionType, handlers, ch)}}// 发送消息到对应的通道,避免阻塞其他消息消费// 只有在 handlers 中存在对应的 actionType 时才发送消息到对应的通道if _, ok := handlers[kafkaMsg.ActionType]; ok {if ch, ok := worker.(chan *CustomKafkaMsg); ok {ch <- &kafkaMsg}}}}
}
由于我的这个写法,让我产生了一点担忧,虽然,我想的是每个ActionType只起一个协程进行消费,难道,实际情况并不是如我预期一样运行,而是,一条kafka消息就起了一个协程进行消费,如果是这种情况的话,那么,会导致大量的垃圾回收,程序的性能就会下降,那么,消息阻塞的问题也就可以解释了。
为了,验证我的这一想法,基于pprof工具看一下实际情况。
实际验证,排除我的担忧,符合我的预期,不是有一条kafka消息就开一个协程进行消费,而是,一个ActionType就只有一个协程进行消费。
模拟生产环境测试
上述的探索,依然不能够完美解释文章开头提到的现象,起了8个消费者,依然导致消息堆积的现象。为了进一步探究其背后的原因,我模拟生产环境的状态,每秒钟往kafka中丢了1000条消息,再观察,我发现,在这种情况下,有时json.Unmarshal也有比较长的耗时,会出现1.5毫秒的耗时,另外,而写日志需要5毫秒左右,如此,每秒只能消费140条消息,消息堆积的现象也就能够解释了。
结论
消息堆积的主要原因是日志打印操作耗时较长,最差时每秒只能消费140条消息。此外,有时JSON解析的时间也较长,这也是一个需要关注的问题。
接下来的目标是找出JSON解析耗时较长和日志打印慢的具体原因,并进行优化。通过解决这些问题,我们有望提高日志处理的效率,从而解决消息堆积的问题。
相关文章:
kafka消费堆积问题探索
背景 我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目…...
一文掌握Docker
目录 1.快速入门 1.1.部署MySQL 1.2.命令解读 2.Docker基础 2.1.常见命令 2.1.1.命令介绍 2.1.2.演示 2.1.3.命令别名 2.2.数据卷 2.2.1.什么是数据卷 2.2.2.数据卷命令 2.2.3.挂载本地目录或文件 2.3.镜像 2.3.1.镜像结构 2.3.2.Dockerfile构建镜像 2.3.3.构建…...
慧集通(DataLinkX)iPaaS集成平台-系统管理之UI库管理、流程模板
UI库管理 UI库管理分为平台级和自建两种,其中平台级就是慧集通平台自己内置的一些ui库所有客户均可调用,自建则是平台支持使用者自己根据规则自己新增对应的UI库。具体界面如下: 自建UI库新增界面: 注:平台级UI库不支…...
【学习笔记】Macbook管理多个不同的Python版本
在MacBook上管理不同项目的不同Python版本,可以使用多种方法。以下是一些常见的方法: 1. 使用 pyenv pyenv 是一个非常流行的工具,可以让你轻松安装和切换多个Python版本。以下是安装和使用 pyenv 的步骤: 安装 pyenv 安装依赖…...
前端如何设计一个回溯用户操作的方案
同一个项目,为什么我本地无法复现,只有客户的设备才复现? 如何获取用户的操作路径呢? 两种方案:埋点和rrweb 埋点就很简单了,将所有可能操作的节点都进行预埋数据;但埋点简单并不省心ÿ…...
WPF、控件模板(ControlTemplate)和数据模板(DataTemplate)
前言 在 WPF 中,控件种类丰富且功能非常完善。一个显著的优点是 WPF 提供了强大的自定义能力和灵活的用户界面表现,能够满足各种复杂的应用需求。其中,ControlTemplate 和 DataTemplate 是两个非常重要的概念,分别用于自定义控件…...
MAC上安装Octave
1. 当前最新版Octave是9.3版本,需要把mac os系统升级到14版本(本人之前的版本是10版本) https://wiki.octave.org/Octave_for_macOS octave的历史版本参考此文档:Octave for macOS (outdated) - Octavehttps://wiki.octave.org/Oc…...
RabbitMQ(三)
RabbitMQ中的各模式及其用法 工作队列模式一、生产者代码1、封装工具类2、编写代码3、发送消息效果 二、消费者代码1、编写代码2、运行效果 发布订阅模式一、生产者代码二、消费者代码1、消费者1号2、消费者2号 三、运行效果四、小结 路由模式一、生产者代码二、消费者代码1、消…...
一体机cell服务器更换内存步骤
一体机cell服务器更换内存步骤: #1、确认grdidisk状态 cellcli -e list griddisk attribute name,asmmodestatus,asmdeactivationoutcome #2、offline griddisk cellcli -e alter griddisk all inactive #3、确认全部offline后进行关机操作 shutdown -h now #4、开…...
年后找工作需要注意的事项
大家好!我是 [数擎 AI],一位热爱探索新技术的前端开发者,在这里分享前端和 Web3D、AI 技术的干货与实战经验。如果你对技术有热情,欢迎关注我的文章,我们一起成长、进步! 开发领域:前端开发 | A…...
【网络 MAC 学习专栏 -- 如何理解 PHY 的 Link Up】
请阅读【嵌入式开发学习必备专栏 Cache | MMU | AMBA BUS | CoreSight | Trace32 | CoreLink | ARM GCC | CSH】 文章目录 OverviewClause 22/Clause 45Clause 22Clause 45 PHY Link 状态的软件实现 转自: 开心果 Need Car 2022年10月20日 09:50 上海 Overview PHY…...
鸿蒙UI开发——文本级联选择器
1、概 述 ArkUI提供了一个文本选择器(showTextPickerDialog),可以方便的实现文本级联选择,例如:省->市->区,示意如下: 下面针对文本选择器做简单介绍。 2、接口介绍 定义文本滑动选择器…...
后盾人JS -- JS运算符与流程控制
嘻嘻 赋值运算符与算术运算符 没什么好说的,等号谁都会用 比较运算符注意事项 如果一个是字符一个是数字也是可以比较的() 是判断值和类型是否相等 <!DOCTYPE html> <html lang"en"><head><meta charset…...
Hive SQL必刷练习题:留存率问题
首次登录算作当天新增,第二天也登录了算作一日留存。可以理解为,在10月1号登陆了。在10月2号也登陆了,那这个人就可以算是在1号留存 今日留存率 (今日登录且明天也登录的用户数) / 今日登录的总用户数 * 100% 解决思…...
笔记本电脑 选购 回收 特权模式使用 指南
笔记本电脑 factor 无线网卡:有些笔记本无法检测到特定频段的信息,会导致连不上校园网 sudo iwlist wlp2s0 scan | grep Frequency > net.txt cat net.txt>表示用终端输出覆盖后续文件,>>表示添加到后续文件的末尾 一种更简…...
基于PyQt - 6的医疗多模态大模型医疗研究系统中的创新构建与应用(上 .文章部分)
一、引言 1.1 研究背景与意义 在当今数智化时代,医疗行业正经历着深刻的变革,对智能化、高效化的需求日益迫切。传统的医疗模式在面对海量的医疗数据、复杂的诊断流程以及个性化的治疗需求时,逐渐显露出局限性。随着人工智能技术的飞速发展,多模态大模型作为一种前沿技术…...
下载文件,浏览器阻止不安全下载
背景: 在项目开发中,遇到需要下载文件的情况,文件类型可能是图片、excell表、pdf、zip等文件类型,但浏览器会阻止不安全的下载链接。 效果展示: 下载文件的两种方式: 一、根据接口的相对url,拼…...
1.15学习
web ctfhub-网站源码 打开环境,查看源代码无任何作用,但是其提醒就在表面暗示我们用dirsearch进行目录扫描,登录kali的root端,利用终端输入dirsearch -u 网址的命令扫描该网址目录,扫描成功后获得信息,在…...
shell练习2
需求:判断192.168.1.0/24网络中,当前在线的ip有哪些,并编写脚本打印出来。 #!/bin/bashnmap -sn 192.168.1.0/24 | grep Nmap scan report for | awk {print $5} 注意:当运行 bash ip.sh 时出现 nmap: command not found 的错误…...
MySQL学习笔记5【SQL优化/视图/存储过程/触发器】
MySQL学习笔记 SQL优化 1. 插入数据优化 普通插入: 采用批量插入: 每次插入不建议超过1000条记录,这样可以减少事务开销,提高性能。示例: INSERT INTO tb_user (name, age) VALUES (Alice, 25), (Bob, 30), ...;手动提…...
C++单例模式的设计
单例模式(Singleton Pattern)是一种设计模式,用于确保一个类只有一个实例,并提供一个全局访问点来访问该实例。在C中,单例模式通常用于管理全局资源或共享状态。 以下是C中实现单例模式的几种常见方式: 懒…...
【Linux系统编程】—— 自动化构建工具Makefile指南
文章目录 背景基本使用推导过程适度扩展语法 背景 Makefile 是衡量开发者是否具备完成大型工程能力的一个重要标志。在一个工程中,源文件的数量可能极多,这些文件会按照类型、功能或模块分布在多个目录中。Makefile 通过定义一系列规则,指定…...
【SpringBoot应用篇】SpringBoot+MDC+自定义Filter操作traceId实现日志链路追踪
【SpringBoot应用篇】SpringBootMDC自定义Filter操作traceId实现日志链路追踪 解决的问题解决方案MDC具体逻辑ymllogback-spring.xmlTraceIdUtil操作工具类TraceIdFilter自定义过滤器GlobalExceptionHandler全局异常处理类TraceIdAspect切面UserController测试验证 多线程处理M…...
少一点If/Else - 状态模式(State Pattern)
状态模式(State Pattern) 状态模式(State Pattern)状态模式(State Pattern)概述状态模式(State Pattern)结构图状态模式(State Pattern)涉及的角色 talk is c…...
【SVN】版本发布快捷操作
摘要:因为每次发版都需要制作一份相同的文件夹,而大部分的包都不需要变更,但是文件又非常大,记录自己的操作经验。 首先在SVN Repository Browser 界面把上一次的版本复制一份,复制的时候重命名为新的版本号 右击要复…...
nacos环境搭建以及SpringCloudAlibaba脚手架启动环境映射开发程序
1:下载nacos 地址:https://github.com/alibaba/nacos/tags 2:选择server的zip包下载 3:启动mysql服务,新建数据库:nacos_yh 4:解压下载的nacos_server 进入conf目录 5:mysql运行sql脚本变得到下面的表 6&a…...
【笔记整理】记录参加骁龙AIPC开发者技术沙龙的笔记
AIoT 首先了解了一个概念叫AIoT,我的理解就是AI IoT 5G,通过AI的发展使得边缘计算、数据整合和处理变得快捷方便,不仅限于传统的云端数据处理,在边缘的IoT设备上也可以进行智能化打造,通过5G的通信能力扩展可以实现…...
Kotlin 协程基础十 —— 协作、互斥锁与共享变量
Kotlin 协程基础系列: Kotlin 协程基础一 —— 总体知识概述 Kotlin 协程基础二 —— 结构化并发(一) Kotlin 协程基础三 —— 结构化并发(二) Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext Kotlin 协程…...
DAMA CDGA 备考笔记(二)
1. 考点分布 2. 第二章 数据处理伦理知识点总结 伦理是建立在是非观念上的行为准则。伦理准则通常侧重于公平、尊重、责任、诚信、质量、可靠性、透明度和信任等方面。数据伦理是一项社会责任问题不是法律问题。 度量指标:培训员工人数、合规/不合规事件、企业高管…...
【Lua学习之旅】之单行/多行注释
Lua的注释 单行注释多行注释 单行注释 lua中的单行注释采用两个短横线"--" --这是lua单行注释多行注释 写法一: --[[ 这个lua的多行注释, 很多资料说多行注释不可以嵌套, 根据我的测试,这种写法的多行注释在lua54版…...
【线性代数】行列式的概念
d e t ( A ) ∑ i 1 , i 2 , ⋯ , i n ( − 1 ) σ ( i 1 , ⋯ , i n ) a 1 , i 1 a 2 , i 2 , ⋯ , a n , i n det(A) \sum_{i_1,i_2,\cdots,i_n } (-1)^{\sigma(i_1,\cdots,i_n)} a_{1,i_1}a_{2,i_2},\cdots, a_{n,i_n} det(A)i1,i2,⋯,in∑(−1)σ(i1,⋯,in)a1…...
react中hooks之useEffect 用法总结
1. 什么是函数的副作用(Side Effects) 副作用是指在组件渲染过程中,除了返回 JSX 之外的其他操作,例如: 数据获取(API 调用)订阅数据源手动修改 DOM设置定时器存储数据日志记录 纯函数是特定的…...
小型、中型无人机执照学习和考试区别详解
小型、中型无人机执照的学习和考试在多个方面存在区别。以下是对两者的详细对比: 一、定义与适用范围 1. 小型无人机: 通常指起飞重量在7kg至25kg之间的无人机。 适用于多种应用场景,包括商业飞行、航拍、农业植保等。 必须持有民航局无人…...
【Go】Go Gin框架初识(一)
1. 什么是Gin框架 Gin框架:是一个由 Golang 语言开发的 web 框架,能够极大提高开发 web 应用的效率! 1.1 什么是web框架 web框架体系图(前后端不分离)如下图所示: 从上图中我们可以发现一个Web框架最重要…...
计算机网络的五层协议
计算机网络的五层协议 计算机网络的五层协议模型包括物理层、数据链路层、网络层、传输层和应用层,每一层都有其特定的功能和相关的协议。1 物理层:负责传输原始的比特流,通过线路(有线或无线)将数据转换为…...
QT中,在子线程中更新UI,会出现哪些问题,如何避免这种情况发生。
在Qt中,直接从子线程更新UI(用户界面)通常会导致各种问题,主要是因为Qt的UI组件(如QWidget及其子类)并不是线程安全的。具体来说,可能会出现以下问题: 崩溃和未定义行为:…...
C++并发编程之多线程环境下使用无锁数据结构的重要准则
在多线程环境中使用无锁数据结构(Lock-Free Data Structures)能够显著提高程序的并发性能,因为它们避免了传统锁机制带来的竞争和阻塞问题。然而,无锁编程本身也带来了许多挑战,如内存管理、数据一致性和正确性等问题。…...
Vue篇-07
Vue UI组件库 一、移动端常用的UI组件库 1.1、Vant 1.2、Cube UI 1.3、Mint UI 二、PC端常用的UI组件库 2.1、Element UI Element - The worlds most popular Vue UI framework 安装: 按需引入: 135_尚硅谷Vue技术_element-ui按需引入_哔哩哔哩_b…...
Zookeeper 数据迁移实战:基础环境搭建与高效迁移方案全览
文章目录 一、Zookeeper数据迁移简介二、迁移zookeeper数据基础环境三、利用快照迁移zookeeper数据1、Node1最新的zk快照文件和日志文件2、将被迁移方node2的zookeeper的集群全部stop3、将源node1集群数据和日志拷贝到指定目录下4、验证优先启动拷贝的数据、日志的zookeeper节点…...
内联变量(inline variables):在多个文件中共享全局常量
在 C17 中,引入了 内联变量(inline variables) 的概念,可以用于在多个文件中共享全局常量。内联变量允许在头文件中定义变量,而不会导致链接错误(如重复定义)。这种方式非常适合用于定义跨多个文…...
WOA-CNN-LSTM-Attention、CNN-LSTM-Attention、WOA-CNN-LSTM、CNN-LSTM四模型对比多变量时序预测
WOA-CNN-LSTM-Attention、CNN-LSTM-Attention、WOA-CNN-LSTM、CNN-LSTM四模型对比多变量时序预测 目录 WOA-CNN-LSTM-Attention、CNN-LSTM-Attention、WOA-CNN-LSTM、CNN-LSTM四模型对比多变量时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 基于WOA-CNN-LSTM-A…...
【kubernetes】K8S节点状态的维护
1 节点状态 节点是K8S集群中的一类重要资源,节点的状态通常可以作为判断集群异常的重要手段。 为了展示节点在各方面的健康程度,在kubectl describe node k8s-master的输出结果中的Conditions部分可以查看k8s-master节点的一些状态数据: N…...
工业视觉2-相机选型
工业视觉2-相机选型 一、按芯片类型二、按传感器结构特征三、按扫描方式四、按分辨率大小五、按输出信号六、按输出色彩接口类型 这张图片对工业相机的分类方式进行了总结,具体如下: 一、按芯片类型 CCD相机:采用电荷耦合器件(CC…...
Oracle查询-in条件超过1000
目录 1.不分页 2.分页 oracle数据库中,in的查询条件超过1000的话,就会报错,应该怎样处理这样的情况呢? 1.不分页 把查询条件分成几个list,每个list有1000个数据,有几个list查询几次数据库就行了 2.分…...
使用rknn进行retinaface部署(C++)
文章目录 RetinaFace导出ONNX导出RKNN编译运行学生课堂开源数据集RetinaFace RetinaFace是一种基于深度学习的高性能人脸检测方法,由InsightFace团队提出。它的核心思想是在单阶段检测器(如RetinaNet)的基础上,结合多任务学习来实现精确的人脸检测和特征点定位。以下是Ret…...
微服务拆分
微服务拆分 接下来,我们就一起将黑马商城这个单体项目拆分为微服务项目,并解决其中出现的各种问题。 熟悉黑马商城 首先,我们需要熟悉黑马商城项目的基本结构: 大家可以直接启动该项目,测试效果。不过,…...
【matlab】matlab知识点及HTTP、TCP通信
1、矩阵运算 点乘:对于两个同维度的向量,点乘结果是这两个向量对应分量的乘积之和。 点除:是指对两个数组的对应元素进行除法运算。 点幂:表示元素对元素的幂运算。 >> A[1,2,3;4,5,6]; B[1,1,1;2,2,2]>> D1B.*AD…...
亿道三防丨三防笔记本是什么意思?和普通笔记本的优势在哪里?
三防笔记本是什么意思?和普通笔记本的优势在哪里? 在现代社会中,笔记本电脑已经成为人们工作和生活中不可或缺的一部分。然而,在一些特殊行业或环境中,普通笔记本电脑由于其脆弱性和对环境条件的敏感性,往…...
C++并发编程之并发可扩展性与阿姆达尔定律
在C并发编程中,可扩展性和阿姆达尔定律(Amdahl’s Law)是两个非常重要的概念,它们帮助我们理解和优化并发程序的性能。下面我们分别讨论这两个概念,并探讨它们在C并发编程中的应用。 可扩展性 可扩展性(S…...
java 迪米特法则,原理、思想、工作流程、实现细节、稳定性、优缺点、应用场景等
迪米特法则(Law of Demeter,LoD),也被称为“最少知识原则”,是一种指导面向对象设计的原则,旨在减少对象之间的耦合度。以下是对迪米特法则的详细解析。 1. 定义 迪米特法则指出:一个对象应该…...