Kafka 多线程开发消费者实例
目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。
Kafka Java Consumer 设计原理
在开始探究之前,我先简单阐述下 Kafka Java Consumer 为什么采用单线程的设计。了解了这一点,对我们后面制定多线程方案大有裨益。
谈到 Java Consumer API,最重要的当属它的入口类 KafkaConsumer 了。我们说 KafkaConsumer 是单线程的设计,严格来说这是不准确的。因为,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。
所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
不过,虽然有心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成的。因此,在消费消息的这个层面上,我们依然可以安全地认为 KafkaConsumer 是单线程的设计。
其实,在社区推出 Java Consumer API 之前,Kafka 中存在着一组统称为 Scala Consumer 的 API。这组 API,或者说这个 Consumer,也被称为老版本 Consumer,目前在新版的 Kafka 代码中已经被完全移除了。
我之所以重提旧事,是想告诉你,老版本 Consumer 是多线程的架构,每个 Consumer 实例在内部为所有订阅的主题分区创建对应的消息获取线程,也称 Fetcher 线程。老版本 Consumer 同时也是阻塞式的(blocking),Consumer 实例启动后,内部会创建很多阻塞式的消息获取迭代器。但在很多场景下,Consumer 端是有非阻塞需求的,比如在流处理应用中执行过滤(filter)、连接(join)、分组(group by)等操作时就不能是阻塞式的。基于这个原因,社区为新版本 Consumer 设计了单线程 + 轮询的机制。这种设计能够较好地实现非阻塞式的消息获取。
除此之外,单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。
另外,不论使用哪种编程语言,单线程的设计都比较容易实现。相反,并不是所有的编程语言都能够很好地支持多线程。从这一点上来说,单线程设计的 Consumer 更容易移植到其他语言上。毕竟,Kafka 社区想要打造上下游生态的话,肯定是希望出现越来越多的客户端的。
多线程方案
了解了单线程的设计原理之后,我们来具体分析一下 KafkaConsumer 这个类的使用方法,以及如何推演出对应的多线程方案。
首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
当然了,这也不是绝对的。KafkaConsumer 中有个方法是例外的,它就是wakeup(),你可以在其他线程中安全地调用KafkaConsumer.wakeup()来唤醒 Consumer。
鉴于 KafkaConsumer 不是线程安全的事实,我们能够制定两套多线程方案。
- 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:
总体来说,这两种方案都会创建多个线程,这些线程都会参与到消息的消费过程中,但各自的思路是不一样的。
我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。
这两种方案孰优孰劣呢?应该说是各有千秋。我总结了一下这两种方案的优缺点,我们先来看看下面这张表格。
推荐阅读
结合案例深入理解DDD聚合与聚合根
技术架构:作为开发,你真的了解系统吗-CSDN博客
相关文章:
Kafka 多线程开发消费者实例
目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka …...
org.apache.maven.surefire:surefire-junit-platform:jar:2.22.2 Maven打包失败
org.apache.maven.surefire:surefire-junit-platform:jar:2.22.2 解决办法 勾上这个,打包时跳过测试代码...
在Ubuntu中固定USB设备的串口号
获取设备信息 lsusb # 记录设备的Vendor ID和Product ID(例如:ID 0403:6001)# 获取详细属性(替换X和Y为实际设备号) udevadm info -a /dev/ttyUSBX 结果一般如下 创建udev规则文件 sudo gedit /etc/udev/rules.d/us…...
Java后端开发: 如何安装搭建Java开发环境《安装JDK》和 检测JDK版本
文章目录 一、JDK的安装1、 打开 Oracle 官方网址2、点击产品 二、检测JDK是否安装成功以及JDK版本的查看1. 打开命令行窗口检测是否安装成功查看 JDK 版本 一、JDK的安装 1、 打开 Oracle 官方网址 Oracle官网地址:https://www.oracle.com/cn/ 2、点击产品 打开下载的JDK文件…...
AudioFlinger与AudioPoliceManager初始化流程
AF/APF启动流程 在启动AudioSeriver服务的过程中会对启动AF/APF。main_audioserver.cpp有如下代码: AudioFlinger::instantiate();AudioPolicyService::instantiate();AF初始化流程 1.AudioFlinger::instantiate() 1.1 AudioFlinger构造函数 void AudioFlinger:…...
Keepalive+LVS+Nginx+NFS高可用架构
KeepaliveLVSNginxNFS高可用架构 1. NFS 业务服务器(192.168.98.138)2. Web服务集群(搭建RS服务器)开机自启动自动挂载配置nginx(为了区分Web1与Web2访问的文件内容) 3. LVS主机(Keepalivedlvs&…...
SpringBoot分布式项目订单管理实战:Mybatis最佳实践全解
一、架构设计与技术选型 典型分布式订单系统架构: [网关层] → [订单服务] ←→ [分布式缓存]↑ ↓ [用户服务] [支付服务]↓ ↓ [MySQL集群] ← [分库分表中间件]技术栈组合: Spring Boot 3.xMybatis-Plus 3.5.xShardingSpher…...
ctfshow WEB web8
首先确定注入点,输入以下payload使SQL恒成立 ?id-1/**/or/**/true 再输入一下payload 使SQL恒不成立 ?id-1/**/or/**/false 由于SQL恒不成立, 数据库查询不到任何数据, 从而导致页面空显示 由以上返回结果可知,该页面存在SQL注入,注入点…...
当AI代写作业成为常态:重构智能时代的教育范式
2023年春季,某重点高中教师发现全班35%的作文呈现相似AI生成特征;同年国际数学竞赛中,参赛选手使用AI解题引发伦理争议。当GPT-4在SAT考试中取得1520分,当Claude3能撰写专业学术论文,教育领域正面临百年未有之大变革。这场技术革命倒逼我们重新思考:在AI能完成知识性任务…...
基于杜鹃鸟鲶鱼优化(Cuckoo Catfish Optimizer,CCO)算法的多个无人机协同路径规划(可以自定义无人机数量及起始点),MATLAB代码
一、杜鹃鸟鲶鱼优化算法 杜鹃鸟鲶鱼优化(Cuckoo Catfish Optimizer,CCO)算法模拟了杜鹃鸟鲶鱼的搜索、捕食和寄生慈鲷行为。该算法的早期迭代侧重于执行多维包络搜索策略和压缩空间策略,并结合辅助搜索策略来有效限制慈鳔的逃逸空…...
Ubuntu 22.04.5 LTS 设置时间同步 ntp
提示:文章为操作记录,以备下次使用 文章目录 前言一、设置ntp1.1替换国内源1.2 更新源&安装1.3 验证 前言 设置时间同步,环境版本 # cat /etc/os-release PRETTY_NAME"Ubuntu 22.04.5 LTS" NAME"Ubuntu" VERSION_…...
搭建前端环境和后端环境
搭建前端环境 ①、安装vscode,并安装相应的插件工具 ②、安装node.js,可以选择当前版本,或者其他版本 ③、创建工作区 创建一个空文件夹,然后通过vscode工具打开,保存为后缀名为.code-workspace ④、从gitee…...
【VirtualBox 安装 Ubuntu 22.04】
网上教程良莠不齐,有一个CSDN的教程虽然很全面,但是截图冗余,看蒙了给我,这里记录一个整洁的教程链接。以备后患。 下载安装全流程 UP还在记录生活,看的我好羡慕,呜呜。 [VirtualBox网络配置超全详解]&am…...
好用的Markdown阅读编辑器Typora破解记录
Typora破解 一、下载Typora二、安装Typora三、破解Typora 😀 记录一下Typora破解记录,怕不常用忘记咯,感觉自己现在的脑子就像我的肠子一样,刚装进去就么得了。。。😔 Typroa算是用起来很舒服的Markdown阅读器了吧&am…...
系统与网络安全------Windows系统安全(1)
资料整理于网络资料、书本资料、AI,仅供个人学习参考。 用户账号基础 本地用户账号基础 用户账号概述 用户账号用来记录用户的用户名和口令、隶属的组等信息 每个用户账号包含唯一的登录名和对应的密码 不同的用户身份拥有不同的权限 操作系统根据SID识别不同…...
使用 Docker Compose 在单节点部署多容器
Docker Compose 是什么 Docker Compose 是一个用于运行多容器应用的工具, 通过一个docker-compose.yml文件, 配置应用的服务、网络和卷,然后使用简单的命令启动或停止所有服务 为什么需要 Docker Compose 当你有一个包含多个相互依赖的容器应用时,手动…...
CSS中的em,rem,vm,vh详解
一:em 和 rem 是两种相对单位,它们常用于 CSS 中来设置尺寸、字体大小、间距等,主要用于更灵活和响应式的布局设计。它们与像素(px)不同,不是固定的,而是相对于其他元素的尺寸来计算的。 1. em …...
MQTT之重复消息(5、TCP重连和MQTT重连)
目录 1. TCP 协议层的重传(原生机制) 2. 触发 TCP 重传的具体场景 3、TCP 重传的关键参数(了解) 第一、重传超时(RTO - Retransmission Timeout) 第二、重传次数 第三、累计时间 vs 本次 RTO 的区别 第四.常见问题解答 第…...
Ground Truth(真实标注数据):机器学习中的“真相”基准
Ground Truth:机器学习中的“真相”基准 文章目录 Ground Truth:机器学习中的“真相”基准引言什么是Ground Truth?Ground Truth的重要性1. 模型训练的基础2. 模型评估的标准3. 模型改进的指导 获取Ground Truth的方法1. 人工标注2. 众包标注…...
Android学习总结之通信篇
一、Binder跨进程通信的底层实现细节(挂科率35%) 高频问题:“Binder如何实现一次跨进程方法调用?” 候选人常见错误: 仅回答“通过Binder驱动传输数据”,缺乏对内存映射和线程调度的描述混淆Binde…...
自动化发布工具CI/CD实践Jenkins部署与配置教程
1. 前言背景 其实一直想把jenkins 的笔记整理下,介于公司这次升级jenkins2.0 ,根据自己部署的一些经验,我把它整理成笔记。 之前我们的jenkins1.0 时代 还一直停留在 free style 或者 maven 风格的项目,随着项目的日益增多&#x…...
kubernet在prometheus+alertmanager+grafana框架下新增部署loki模块
整个框架拓扑图 #mermaid-svg-OK7jgNZ2I7II8nJx {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-OK7jgNZ2I7II8nJx .error-icon{fill:#552222;}#mermaid-svg-OK7jgNZ2I7II8nJx .error-text{fill:#552222;stroke:#552…...
Ubuntu下UEFI安全启动安装Nvdia驱动
简介 众所周知,Ubuntu默认使用Nouveau开源驱动,其性能受限,因此我们需要安装Nvidia专用驱动。 安装专用驱动的一般方法非常简单,只需要sudo ubuntu-drivers devices && sudo ubuntu-drivers autoinstall即可,…...
Java多线程与高并发专题—— CyclicBarrier 和 CountDownLatch 有什么异同?
引入 上一篇我们了解CountDownLatch的原理和常见用法,在CountDownLatch的源码注释中,有提到: 另一种典型用法是将一个问题分解为 N 个部分,用一个Runnable描述每个部分,该Runnable执行相应部分的任务并对闭锁进行倒计…...
【深度学习】不管理论,入门从手写数字识别开始
1. 环境安装 学习深度学习,开发语言是Python。Python开发工具有很多。其中 anaconda vscode的Python开发环境很好用,建议使用这个组合。 编写手写数字识别测试代码,需要在使用Anaconda安装以下4个库: NumpyScipymatplotlibsci…...
Docker使用ubuntu
1. 更换源 sudo nano /etc/docker/daemon.json //daemon.conf查找最新可用的源1、2,手动查找会不断更新! 1.1 添加DNS sudo nano /etc/resolv.confnameserver 8.8.8.8 nameserver 8.8.4.4 2. 修改配置文件后重新加载 sudo systemctl daemon-relo…...
Windows 系统下多功能免费 PDF 编辑工具详解
IceCream PDF Editor是一款极为实用且操作简便的PDF文件编辑工具,它完美适配Windows操作系统。其用户界面设计得十分直观,哪怕是初次接触的用户也能快速上手。更为重要的是,该软件具备丰富多样的强大功能,能全方位满足各类PDF编辑…...
影响HTTP网络请求的因素
影响 HTTP 网络请求的因素 1. 带宽 2. 延迟 浏览器阻塞:浏览器会因为一些原因阻塞请求,浏览器对于同一个域名,同时只能有4个连接(这个根据浏览器内核不同可能会有所差异),超过浏览器最大连接数限制&…...
OpenGL —— 流媒体播放器 - ffmpeg解码rtsp流,opengl渲染yuv视频(附源码,glfw+glad)
🔔 OpenGL 相关技术、疑难杂症文章合集(掌握后可自封大侠 ⓿_⓿)(记得收藏,持续更新中…) 效果 说明 FFMpeg和OpenGL作为两大技术巨头,分别在视频解码和图形渲染领域发挥着举足轻重的作用。本文将综合两者实战视频播放器,大概技术流程为:ffmpeg拉取rtsp协议视频流,并…...
Java + LangChain 实战入门,开发大语言模型应用!
在 Baeldung 上看到了一篇介绍基于 Java LangChain 开发大语言模型应用的基础入门文章,写的非常不错,非常适合初学者。于是,我抽空翻译了一下。 原文地址:https://www.baeldung.com/java-langchain-basics翻译: Java…...
【目标检测】【深度学习】【Pytorch版本】YOLOV1模型算法详解
【目标检测】【深度学习】【Pytorch版本】YOLOV1模型算法详解 文章目录 【目标检测】【深度学习】【Pytorch版本】YOLOV1模型算法详解前言YOLOV1的模型结构YOLOV1模型的基本执行流程YOLOV1模型的网络参数YOLOV1模型的训练方式 YOLOV1的核心思想前向传播阶段网格单元(grid cell)…...
软考《信息系统运行管理员》- 6.2 信息系统硬件的安全运维
硬件安全运行的概念 硬件安全运行的含义是保护支撑信息系统业务活动的信息系统硬件资产免遭自然灾害、人 为因素及各种计算机犯罪行为导致的破坏。硬件安全通常包括环境安全、设备安全和介质安全。 硬件安全运行的影响因素 硬件安全运行的影响因素主要有: (1)自然…...
SQL 视图
SQL 视图 引言 SQL(结构化查询语言)视图是数据库管理系统中的一种重要概念。它提供了一个虚拟的表,该表由一个或多个基本表的数据组成,用户可以通过视图查询数据,而不需要直接操作基本表。本文将详细介绍SQL视图的定…...
Chrome 开发环境快速屏蔽 CORS 跨域限制!
Chrome 开发环境快速屏蔽 CORS 跨域限制【详细教程】 ❓ 为什么需要临时屏蔽 CORS? 在前后端开发过程中,我们经常会遇到 跨域请求被浏览器拦截 的问题。例如,你在 http://localhost:3000 调用 https://api.example.com 时,可能会…...
数值稳定性 + 模型初始化和激活函数
数值稳定性 神经网络的梯度 考虑如下有 d 层的神经网络 h t f t ( h t − 1 ) and y ℓ ∘ f d ∘ … ∘ f 1 ( x ) \mathbf{h}^t f_t(\mathbf{h}^{t-1}) \quad \text{and} \quad y \ell \circ f_d \circ \ldots \circ f_1(\mathbf{x}) htft(ht−1)andyℓ∘fd∘…∘…...
【消息队列】几个mq组件的对比: redis stream/rabbitmq/rocketmq/kafka
1. 消息队列 几个组件: Redis Stream:适用于对性能要求高、可靠性要求不高的场景Rocket MQ:可靠性高,性能优秀,但官方对 go 不太友好,sdk 缺少很多功能支持Rabbit MQ:性能适中,使用…...
TCP协议与wireshark抓包分析
一、tcp协议格式 1. 源端口号 : 发送方使用的端口号 2. 目的端口号 : 接收方使用的端口号 3. 序号: 数据包编号 , tcp 协议为每个数据都设置编号,用于确认是否接收到相应的包 4. 确认序列号 : 使用 tcp 协议接收到数据包,…...
深度学习四大核心架构:神经网络(NN)、卷积神经网络(CNN)、循环神经网络(RNN)与Transformer全概述
目录 📂 深度学习四大核心架构 🌰 知识点概述 🧠 核心区别对比表 ⚡ 生活化案例理解 🔑 选型指南 📂 深度学习四大核心架构 第一篇: 神经网络基础(NN) 🌰 知识点概述…...
springcloud 整合 Redis_Redisson
springcloud 整合 Redis 、Redisson Redis-x64-5.0.14.1 版本 https://blog.csdn.net/wojiubugaosuni12/article/details/134452665 https://www.123pan.com/s/8EpMjv-MTjBv.html spring cloud 整合 redis Redis 5.0.14 Springcloud 2021.0.5 Redis启动 redis-server.exe 点击…...
Windows模仿Mac大小写切换, 中英文切换
CapsLock 功能优化脚本部署指南 部署步骤 第一步:安装 AutoHotkey v2 访问 AutoHotkey v2 官网下载并安装最新版本安装时勾选 "Add Compile Script to context menus" 第二步:部署脚本 直接运行 (调试推荐) 新建文本文件,粘贴…...
基于Spring Boot的木里风景文化管理平台的设计与实现(LW+源码+讲解)
专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…...
【银河麒麟系统常识】命令:uname -m(查看系统架构)
命令: uname -m 功能 常用的 Linux/Unix 终端命令,用于显示当前系统的硬件架构; 返回 返回系统的CPU架构类型,用于判断软件兼容性; 输出结果架构说明常见设备x86_64Intel/AMD 64位 CPU主流 PC、服务器aarch64ARM 64位 …...
网络原理-TCP/IP
网络原理学习笔记:TCP/IP 核心概念 本文是我在学习网络原理时整理的笔记,主要涵盖传输层、网络层和数据链路层的核心协议和概念,特别是 TCP, UDP, IP, 和以太网。 一、传输层 (Transport Layer) 传输层负责提供端到端(进程到进…...
【银河麒麟高级服务器操作系统 】虚拟机运行数据库存储异常现象分析及处理全流程
更多银河麒麟操作系统产品及技术讨论,欢迎加入银河麒麟操作系统官方论坛 https://forum.kylinos.cn 了解更多银河麒麟操作系统全新产品,请点击访问 麒麟软件产品专区:https://product.kylinos.cn 开发者专区:https://developer…...
AI-Sphere-Butler之如何使用腾讯云ASR语音识别服务
环境: AI-Sphere-Butler WSL2 英伟达4070ti 12G Win10 Ubuntu22.04 腾讯云ASR 问题描述: AI-Sphere-Butler之如何使用腾讯云ASR语音识别服务,本地硬件配置不高的情况,建议使用云服务商的ASR 解决方案: 1.登…...
Dify 0.15.3版本 本地部署指南
目录 背景 一、单机部署机器配置最低要求 二、系统Python环境安装 安装需要的python依赖 使用pyenv官方安装脚本 安装poetry 三、中间件部署 PostgreSQL本地部署 添加PG官方仓库 安装pg 16 检查pg版本 修改密码为dify默认 创建数据库dify 安装pg vector插件 修改…...
全书测试:《C++性能优化指南》
以下20道多选题和10道设计题, 用于本书的测试。 以下哪些是C性能优化的核心策略?(多选) A) 优先优化所有代码段 B) 使用更高效的算法 C) 减少内存分配次数 D) 将所有循环展开 关于字符串优化,正确的措施包括ÿ…...
Oracle数据库数据编程SQL<递归函数详解>
递归函数是一种在函数体内直接或间接调用自身的函数。这种函数通过将复杂问题分解为更小的相同问题来解决特定类型的编程任务。 目录 一、递归函数基本概念 1. 递归定义 2. 递归工作原理 二、递归函数示例 1. 经典阶乘函数 2. 斐波那契数列 3. 计算数字位数 三、递归查…...
Burp Suite从入门到实战之配置启动
目录 1.Burp Suite配置启动 1.1安装Burp Suite jar包 1.2JDK,JDK包含JRE(Java运行时环境) 1.2.1配置JDK11环境变量配置 1.2.2系统变量里添加JAVA_HOME编辑 1.2.3找到Path变量进行编辑添加bin 1.2.4命令行查看是否配置成功 1.3激活j…...
【力扣hot100题】(016)缺失的第一个正数
题目里这么多条条框框……先不按条条框框做了两下。 第一个思路:你不仁我不义,先排序后遍历(时间不符题意) class Solution { public:int firstMissingPositive(vector<int>& nums) {sort(nums.begin(),nums.end());i…...