RocketMQ消息拉取模式详解
RocketMQ提供了两种消息拉取模式,Pull模式(主动拉取)和 Push模式(长轮询)。
一、消息拉取模式分类
1. Pull模式(主动拉取)
- 特点:消费者主动向Broker发送请求拉取消息
- 实现类:
DefaultMQPullConsumer
- 优点:
1、消费速率完全由消费者控制
2、适合需要精确控制消费节奏的场景
- 缺点:
1、需要自行管理消费位点(offset)
2、实现相对复杂
2. Push模式(长轮询)
- 特点:底层基于Pull模式封装,对用户表现为推送体验
- 实现类:
DefaultMQPushConsumer
- 优点:
1、使用简单,无需管理offset
2、实时性好(基于长轮询)
- 缺点:消费速率由RocketMQ控制
二、Pull模式详细实现
1. 核心代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class PullConsumerDemo {private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();public static void main(String[] args) throws Exception {// 1. 创建消费者实例DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull_consumer_group");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();// 2. 获取Topic下的所有队列Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TestTopic");// 3. 遍历队列拉取消息for (MessageQueue mq : mqs) {System.out.println("开始从队列 " + mq + " 拉取消息");// 4. 循环拉取消息while (true) {try {// 获取当前队列的消费位点long offset = consumer.fetchConsumeOffset(mq, false);if (offset < 0) offset = 0;// 拉取消息(每次最多32条)PullResult pullResult = consumer.pull(mq, "*", offset, 32);// 处理拉取结果switch (pullResult.getPullStatus()) {case FOUND: // 找到消息for (MessageExt msg : pullResult.getMsgFoundList()) {System.out.println("收到消息: " + new String(msg.getBody()));}// 更新消费位点long nextOffset = pullResult.getNextBeginOffset();consumer.updateConsumeOffset(mq, nextOffset);break;case NO_NEW_MSG: // 没有新消息System.out.println("没有新消息");Thread.sleep(1000); // 暂停1秒break;case NO_MATCHED_MSG: // 没有匹配消息case OFFSET_ILLEGAL: // 位点非法break;}} catch (Exception e) {e.printStackTrace();}}}}
}
2. Pull模式关键点
- 位点管理:需要自行调用
fetchConsumeOffset
和updateConsumeOffset
管理消费进度 - 队列遍历:需要获取Topic下所有队列并逐个处理
- 状态处理:需要处理
FOUND
、NO_NEW_MSG
等不同拉取状态 - 流控:通过调整拉取间隔和批量大小控制消费速率
三、Push模式详细实现
1. 核心代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class PushConsumerDemo {public static void main(String[] args) throws Exception {// 1. 创建消费者实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer_group");consumer.setNamesrvAddr("127.0.0.1:9876");// 2. 订阅Topic和Tagconsumer.subscribe("TestTopic", "*");// 3. 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 4. 处理消息for (MessageExt msg : msgs) {System.out.println("收到消息: " + new String(msg.getBody()));}// 返回消费状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5. 启动消费者consumer.start();System.out.println("消费者已启动");}
}
2. Push模式关键点
- 长轮询机制:Broker在没有消息时会Hold住请求,新消息到达立即返回
- 负载均衡:消费者组内自动分配队列
- 流控参数:
// 设置每次拉取最大消息数(默认32)
consumer.setPullBatchSize(32);
// 设置每次消费最大消息数(默认1)
consumer.setConsumeMessageBatchMaxSize(10);
- 消费模式:
集群模式(默认):组内消费者分摊消息
广播模式:每个消费者收到所有消息
四、两种模式对比
特性 | Pull模式 | Push模式 |
实现复杂度 | 高,需自行管理offset | 低,自动管理 |
实时性 | 依赖拉取频率 | 高(长轮询) |
消费控制 | 完全自主控制 | 由RocketMQ控制 |
适用场景 | 特殊消费逻辑 | 常规消费场景 |
资源占用 | 低(按需拉取) | 中(长连接) |
五、最佳实践建议
- 常规场景:优先使用Push模式,简单高效
- 特殊需求:需要精确控制消费节奏时使用Pull模式
- 性能调优:
1、调整pullBatchSize
和consumeMessageBatchMaxSize
2、合理设置消费者线程数
- 容错处理:
1、实现消息重试逻辑
2、处理消费超时情况
六、底层原理
RocketMQ的Push模式实际上是Pull模式的封装,其核心流程:
- RebalanceService:负责队列分配
- PullMessageService:后台线程执行拉取
- ProcessQueue:缓存拉取到的消息
- ConsumeMessageService:消费消息并回调监听器
这种设计既保持了Pull模式的灵活性,又提供了Push模式的易用性。
相关文章:
RocketMQ消息拉取模式详解
RocketMQ提供了两种消息拉取模式,Pull模式(主动拉取)和 Push模式(长轮询)。 一、消息拉取模式分类 1. Pull模式(主动拉取) 特点:消费者主动向Broker发送请求拉取消息实现类&#…...
C++23 容器从其他兼容范围的可构造性与可赋值性 (P1206R7)
文章目录 背景与动机提案内容与实现细节提案 P1206R7实现细节编译器支持 对开发者的影响提高灵活性简化代码向后兼容性 总结 C23标准引入了对容器构造和赋值的新特性,这些特性使得容器能够更灵活地从其他兼容范围初始化,并支持从范围赋值。这些改进由提案…...
深入解析 HTTP 中的 GET 请求与 POST 请求
在互联网的世界里,数据的传输与交互无时无刻不在发生。HTTP(超文本传输协议)作为 Web 应用的基石,承载着浏览器与服务器之间的通信重任。而 GET 请求和 POST 请求,作为 HTTP 协议中最为常用的两种请求方法,…...
华三(H3C)IRF堆叠心跳的LACP MAD、BFD MAD和ARP MAD差异
华三(H3C)IRF堆叠心跳的三种MAD(多主检测)机制——LACP MAD、BFD MAD和ARP MAD在实现原理、组网要求及适用场景上存在显著差异。以下是三者的对比分析: 一、核心区别对比 特性LACP MADBFD MADARP MAD检测原理扩展LAC…...
thread 的mutex优化
std::mutex mtx; int shared_data 0;void increment() {std::lock_guard<std::mutex> lock(mtx); // 自动加锁shared_data; // 临界区 } // 离开作用域时自动解锁std::lock_guard 在离开作用域时自动解锁的行为是基于 C 的 RAII (Resource Acquisition Is Initializa…...
深入解析前端 JSBridge:现代混合开发的通信基石与架构艺术
引言:被低估的通信革命 在移动互联网爆发式增长的十年间,Hybrid App(混合应用)始终占据着不可替代的地位。作为连接 Web 与 Native 的神经中枢,JSBridge 的设计质量直接决定了应用的性能上限与开发效率。本文将突破传…...
打破次元壁,VR 气象站开启气象学习新姿势
在教育领域,VR 气象站同样发挥着巨大的作用,为气象教学带来了全新的模式,打破了传统教学的次元壁,让学生们以全新的姿势学习气象知识。 在传统的气象教学中,学生们主要通过课本、图片和老师的讲解来学习气象知识。这…...
python八股文汇总(持续更新版)
python装饰器 一、装饰器是什么? 装饰器是Python中一种"化妆师",它能在不修改原函数代码的前提下,给函数动态添加新功能。 本质:一个接收函数作为参数,并返回新函数的工具。作用:像给手机贴膜…...
C#入门系列【基础类型大冒险】从0到1,解锁编程世界的“元素周期表”
C#入门系列【基础类型大冒险】从0到1,解锁编程世界的“元素周期表” 嘿,欢迎来到C#的奇妙世界!如果把编程比作建造一座大厦,那么基础类型就是我们手中的“砖块”和“水泥”。它们看似普通,却构成了所有复杂程序的基石…...
物流项目第四期(运费模板列表实现)
前三期: 物流项目第一期(登录业务)-CSDN博客 物流项目第二期(用户端登录与双token三验证)-CSDN博客 物流项目第三期(统一网关、工厂模式运用)-CSDN博客 模板列表 在后台系统中,…...
数据中心Overlay解决方案
文档围绕数据中心 Overlay 解决方案展开,指出数据中心向大集中、虚拟化、云业务演进,传统架构存在网络规划复杂、弹性不足、业务扩展受限等问题。Overlay 网络在物理网络上构建虚拟网络,实现名址分离、网络与物理解耦,支持业务灵活部署。方案采用VXLAN 技术(如 SDN 控制模…...
中级网络工程师知识点8
1.无线控制器:实现无线网络统一管理,无缝漫游 2.无线认证系统:实现用户使用用户名和密码认证登录,外来访客通过扫描二维码或者手机短信验证登录无线网络 3.POE交换机:实现无线AP的接入和供电 4.高密吸顶式AP&#x…...
【Linux笔记】——简单实习一个日志项目
🔥个人主页🔥:孤寂大仙V 🌈收录专栏🌈:Linux 🌹往期回顾🌹: 【Linux笔记】——线程同步信号量与环形队列生产者消费者模型的实现(PV操作) 🔖流水不争…...
BRIGHTONE : 520-On-Chain WOHOO Carnival
BRIGHTONE is launching the “520-On-Chain WooHoo Carnival,” and the very first blast of $WOOHOO goes live right on schedule—ushering in a new on-chain celebration of joy! At exactly 21:09 on May 20, the “520-On-Chain WooHoo Carnival” officially kicks…...
在Java项目中集成Deepseek大语言模型实践指南
1. 引言 随着人工智能技术的发展,大语言模型在各领域应用日益广泛。本文将详细介绍如何在Java项目中集成Deepseek大模型,实现智能文本生成、对话等功能。 2. 前期准备 准备Java Spring Boot项目环境确保Maven已配置注册Deepseek账号并获取API密钥 获取a…...
医疗影像中,DICOM点云、三角面片实体混合渲染(VR)
此文章,涉及到专业性比较强,所以,大部分的内容,基本上都是示例代码的形式出现。以下的技术路径,完全经过实践验证,并且效果很好,可以放心使用。 1 概述 在医学影像中,对DICOM的渲染…...
程序运行报错分析文档
zryhuawei:~/src/modules/Connect$ ./newbuild/OpConnectAidTool \WARNING: MYSQL_OPT_RECONNECT is deprecated and will be removed in a future version. replace into process_tracking (step_id,date,status,context_data,start_time,end_time,error_log) values(?,?,?…...
C++数据结构——红黑树
文章目录 一、背景二、关键操作1. 旋转2. 变色3. 查找4. 插入5. 删除 三、面试考点 一、背景 红黑树(Red-Black Tree)是一种自平衡的二叉搜索树(BST),通过颜色标记和旋转操作保证树的高度平衡,从而确保插入…...
【Java实战】线程池 并发 并行 生命周期(详细解释)
线程池: 一种复用线程的技术 不使用线程池的问题: 用户每提出一个需求,都要创建一个新的线程。 创建线程池的方法: JDK 5.0起提供了一个代表线程池的接口:ExecutorService。 方式一: 使用ExecutorServic…...
Qwen3多方位评测
一、Qwen3核心优势 结论,针对这些场景:上下文理解、任务编排、工具调用、数据要素抽取等环节,Qwen3-32B已接近DeepSeek-R1。 二、关键测试环节 1、上下文改写 Qwen3-32B对绝对时间语境理解优于Qwen2.5-72B。 其余改写方面,三…...
银行反欺诈理论、方法与实践总结(下):解决方案
一、金融反欺诈防控体系 反欺诈防控体系是金融机构应对欺诈风险的重要工具,它通常包括事前识别、事中决策和事后处置三个关键阶段。 事前识别阶段:此阶段涉及欺诈情报的收集和账户安全的保护,通过名单和画像的构建来识别潜在风险。例如&…...
自回归图像编辑 EditAR: Unified Conditional Generation with Autoregressive Models
Paperhttps://arxiv.org/pdf/2501.04699 Code (coming soon) 目录 方法 实验 EditAR是一个统一的自回归框架,用于各种条件图像生成任务——图像编辑、深度到图像、边缘到图像、分割到图像。 next-token预测的功效尚未被证明用于图像编辑。 EditAR主要构建在Ll…...
Java中的集合详解
下面是文章详细介绍了 Java 集合框架的基本思路、主要接口与实现、各类集合之间的区别与各自的适用场景,以及一些常见的使用技巧和最佳实践,供你参考。 Java中的集合详解 在 Java 开发中,集合(Collection)作为存储和操…...
前端mjs和js文件区别,mjs和cjs区别---.es.js和.mjs的区别
https://www.cnblogs.com/jocongmin/p/18432236 同一份配置如下,一般打包出来的结果时是一样的,只不过扩展名不一样 export default defineConfig({build: {rollupOptions: {output: [// 同一份配置,仅扩展名不同{ format: es, entryFileNames: [name].mjs },{ fo…...
【深度学习】Transformer 的应用
目录 一、自然语言处理领域 1、自然语言处理领域的应用 2、BART模型 3、BERTSum模型与自动文本摘要 4、SG-Net与机器阅读理解 5、SG-Net的应用 6、总结 二、计算机视觉领域 1、图像分类 (1)背景与挑战 (2)Transformer的…...
C#学习10——泛型
一、什么是泛型? 官方理解:允许开发者在定义类、接口、方法或委托时使用类型参数 个人理解: 类型模具(类似Object变色龙) 二、泛型有什么用? 通过参数化类型实现代码复用,提升类型安全性并…...
Spring Validation校验
使用 JSR 303 (Bean Validation) 校验接口参数 JSR 303,也称为Bean Validation规范,提供了一种在Java应用程序中执行验证的标准化方式。它允许你通过注解直接在领域或者DTO(数据传输对象)类上定义校验规则。 1. 添加依赖 首先需…...
精益数据分析(72/126):MVP的核心法则——消除阻碍与聚焦关键指标
精益数据分析(72/126):MVP的核心法则——消除阻碍与聚焦关键指标 在创业领域,许多失败案例源于对产品开发的认知偏差——过度追求功能完善或盲目跟风增长,却忽略了用户核心需求的最直接满足。今天,我们结合…...
从头实现react native expo本地生成APK
根据github上老外的经验制作了一个react native expo项目起始模版,准备放到资源下载里(已经免积分放置好),这个起始模版带有个人非常喜欢的tailwindcss,由于raact native使用sheetstyle这种风格的样式,不太喜欢.当然,我们使用react native paper组件库时,就要对组件库里的组件使…...
打卡第二十三天
仔细回顾一下之前21天的内容,没跟上进度的同学补一下进度。 作业: 自行学习参考如何使用kaggle平台,写下使用注意点,并对下述比赛提交代码。 使用Kaggle平台的注意点 Kaggle是一个数据科学竞赛平台,提供了丰富的数据…...
关于汇编语言与接口技术——单片机串行口的学习心得
学习目标: 1.了解AT89S51单片机片内串行口的基本工作原理 2.掌握与串行口有关的特殊功能寄存器以及四种工作方式 一、串行口内部结构 单片机串行口有两个独立的接收、发送缓冲器SBUF,属于特殊功能寄存器,可以同时发送、接收数据;…...
汇川PLC通过Profinet转ModbusTCP网关读取西门子PLC数据案例
Modbus TCP主站即Modbus TCP客户端,Modbus TCP主站最多支持同时与31个Modbus TCP从站 。(Modbus TCP服务器)进行通信。 第一步设置PLC IP地址; 默认PLC IP地址为192.168.1.88。根据需要判断是否需要修改。 第二步添加Modbus TCP…...
2025-05-20 模型下载--文本向量化--Faiss检索
模型下载 使用Python脚本进行下载 from huggingface_hub import snapshot_download # import os# os.environ["HF_ENDPOINT"] "https://hf-mirror.com" # 自定义下载目录(Windows 路径建议用 raw string 或 pathlib) download_di…...
idea本地debug断点小技巧
idea本地debug断点小技巧 简单的设置断点条件 断点后,右键这个断点,可以在 condition 中填写能得出布尔的表达式 a 1 你如果写如下,表示先给他赋值,然后断住 a 2; true 断点后设置某个变量的值 在 debug 区域可以设置变量…...
Mybatis面向接口编程
添加与Mapper接口的映射 <!--UserMapper.xml--> <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> …...
谈谈对《加密算法》的理解
文章目录 一、什么是加密算法?二、常见的加密算法有哪些?2.1 对称加密2.2 非对称加密2.3 哈希算法 三、加密算法代码展示3.1 MD5加密3.2 秘钥加密3.3 AES加密解密 四、加密算法的使用场景 一、什么是加密算法? 加密算法是一种通过数学方法将…...
代码随想录算法训练营第60期第四十二天打卡
大家好,今天还是继续我们的动态规划里面的背包问题,前面我们主要接触的是0-1背包和完全背包,其实这两个背包问题主要就是看看每一件物品我们是否有多件,如果每一件物品我们只能取一次的话那这样我们就是0-1背包,如果每…...
Java并发进阶系列:深度讨论官方关于jdk1.8ConcurrentHashMap的computeIfAbsent源代码修复逻辑
在文章中《深度解析官方关于jdk1.8的resizeStamp的bug处理过程》,我们讨论关于CHM的核心设计——resizeStam需要修复的处理过程,本文再次基于openJDK的bugs讨论组提出的CHM源代码另外一个会造成死循环的bug,默认读者已经掌握CHM的核心源代码实…...
npm vs npx 终极指南:从原理到实战的深度对比 全面解析包管理器与包执行器的核心差异,助你精准选择工具
npm vs npx 终极指南:从原理到实战的深度对比 全面解析包管理器与包执行器的核心差异,助你精准选择工具 一、核心定位差异 #mermaid-svg-xM2GZt0lejj6hYk6 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}…...
RK3588 IREE+Vulkan ResNet50推理测试
RK3588 IREE+Vulkan ResNet50推理测试 背景一.性能数据【暂不考虑该框架】二.操作步骤2.1 搭建NFS服务,解决IREE编译时,空间不足的问题2.2 编译、安装`IREE`2.2.1 挂载NFS2.2.2 安装依赖2.2.3 编译`IREE`2.2.4 获取驱动及设备信息2.2.5 下载推理图片2.2.6 生成`onnx`模型转换脚…...
Blaster - Multiplayer P77-P89: 武器瞄准机制
P78_ Blaster HUD And Player Controller P78_1 创建PC和HUD P78_2 Tick Component > SetHUDCrosshairs() P79_ Drawing the Crosshairs DrawHUD() Call Every Tick. #include "HUD/BlasterHUD.h"void ABlasterHUD::DrawHUD() {Super::DrawHUD();FVector2D View…...
【每天一个MCP】【记录向】:准备工作,创建github项目
记录一下: 新建一个仓库 各种填写项目信息 点击创建 👆不错,开张了~ 尝试一下这个桌面版的github 登录 果然方便 太高级了。~...
元宇宙中的虚拟经济:机遇与挑战
随着元宇宙概念的兴起,虚拟经济逐渐成为全球科技和经济领域关注的焦点。元宇宙不仅是一个虚拟的社交和娱乐空间,更是一个充满经济活动的全新生态系统。从虚拟货币到数字资产,从虚拟商品交易到去中心化金融(DeFi)&#…...
多环境回测模拟不同市场条件下的策略表现
Backtrader库的核心组件包括数据源、策略、执行引擎和结果分析器。通过组合这些组件,可以构建一个完整的交易系统。 在进行回测之前,需要准备历史市场数据。Backtrader支持多种数据格式,如CSV文件、Pandas数据框等。 加载数据 可以使用Backtrader提供的bt.feeds.YahooFina…...
nRF Connect SDK开发之(1)环境搭建
目录 一、安装 nRF Connect SDK 开发环境 1)git 2)python 3) J-Link 编辑 4)nrfutil 1.将nrfutil应用程序所在目录添加到系统路径PATH 2.在命令行中输入nrfutil检测是否可以正常运行 3.运行命令以列出可用命令:nrfutil search 4.安装 device 、toolchain-man…...
武汉火影数字|数字展厅展馆制作:沉浸式体验,全方位互动
在科技飞速发展的当下,数字技术正以前所未有的速度渗透到各个领域,展厅展馆行业也不例外。数字展厅展馆作为传统展厅展馆的创新升级,正逐渐成为展示领域的新宠,为观众带来前所未有的沉浸式体验。 与传统展厅相比,数字展…...
MinIO集群故障,其中一块driver-4异常
现象 driver-4 Offline,驱动状态为未知。 处理过程 建议每个驱动下面新建个文件,便于根据目录里面的drive-x文件区分驱动 rootpve:/mnt/drive-4# df -h Filesystem Size Used Avail Use% Mounted on /dev/sdb 3.7T 695G 3.0T 19% …...
整型数相加的溢出
当正溢出时,返回TMax,负溢出时,返回TMin。这种运算常用在执行数字信号处理的程序中。 程序代码 int saturating_add(int x,int y);void main() {static int x,y;static int i1,sum;x(i<<15)-1;y(i<<15)-1;sumsaturating_add(x,…...
科目一知识点快速回顾与总结
科目一知识点笔记 扣12分的情况 高速上倒掉逆12;普通路上倒掉1逆3 使用伪造,变造的驾驶证(行驶证)一次记12 饮酒驾驶12 代替实际机动车驾驶人接受交通违法行为处罚和记分牟取经济利益的,一次记12 驾驶校车&#x…...
大模型高效微调技术全面解析:从PEFT原理到实战应用
目录 1. 大语言模型与微调概述 1.1 大语言模型(LLM)简介 1.2 微调的必要性与挑战 2. 参数高效微调(PEFT)技术原理 2.1 PEFT概述 2.2 主要PEFT方法 2.2.1 适配器(Adapters) 2.2.2 LoRA(低秩自适应) 2.2.3 QLoRA(量化LoRA) 2.2.4 IA3(通过抑制和放大内部激活注入适配器)…...