启动你的RocketMQ之旅(五)-Broker详细——消息传输
前言:
👏作者简介:我是笑霸final。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏
上一章节:启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)
下一章节:启动你的RocketMQ之旅(六)-Broker详细——主从复制
目录
- 一、概述
- 二、消息传输
- 2.1SendMessageProcessor
- 2.2 非批量消息 asyncSendMessage
- 2.3 延迟消息
- 2.4 事务消息接收
一、概述
RocketMQ 的 Broker 是消息系统中的核心组件,它负责消息的存储、传输和消费者的请求响应等功能。它的特点和功能如下:
- 消息传输
生产者将消息发送到 Broker,Broker 接收消息后将其写入 CommitLog,并根据消息的主题和队列信息将其存储在相应的 ConsumeQueue 中。
消费者订阅特定的主题,并从 Broker 获取消息进行处理。Broker 根据消费者的订阅关系提供消息。
- 高可用性
主从复制:为了保证高可用性,Broker 支持同步和异步两种方式的主从复制。同步复制保证数据一致性但可能增加延迟,异步复制则提高性能但在故障时可能会丢失部分数据。
水平扩展:RocketMQ 集群可以包含多个 Broker 节点,通过添加新的 Broker 节点来实现系统的水平扩展,从而提高消息存储容量和吞吐量。
- 消息存储
CommitLog:这是 Broker 中存储消息的主要文件,所有消息都以顺序写入的方式记录在 CommitLog 文件中。
ConsumeQueue:逻辑队列,每个主题(Topic)和队列(Queue)对应一个 ConsumeQueue 文件,用于存储消息在 CommitLog 中的物理偏移量等信息,方便快速检索消息。
- 存储优化策略
顺序写入:利用顺序写入的方式提高消息存储效率。
零拷贝技术:减少数据在内核空间和用户空间之间拷贝次数,降低 CPU 和内存开销。
批量处理:支持批量发送和接收消息,减少网络交互次数,提高吞吐量。
二、消息传输
这是 RocketMQ Broker 接收消息的一个简化版源码分析流程。
- BrokerController:作为整个 Broker 的控制中心,它负责启动和管理 Broker 的各个组件。在这个阶段,BrokerController 会调用 NettyRemotingServer 的 start() 方法来启动 Netty 服务端。
- NettyRemotingServer:这是一个基于 Netty 实现的网络通信模块,用于处理与客户端之间的网络通信。当 start() 方法被调用时,它会绑定到指定的端口并开始监听来自客户端的消息发送请求。
- 当 Netty 服务端接收到客户端的消息发送请求后,会触发 processMessageReceived() 方法的执行。
- NettyRemotingAbstract:这是一个抽象类,提供了处理网络通信的基本功能。在 processMessageReceived() 方法中,会对接收到的消息进行初步处理,如解码、验证等,并将其封装成一个请求对象。
- 经过初步处理后的请求会被传递给 SendMessageProcessor 进行进一步处理。SendMessageProcessor 是专门负责处理消息发送请求的处理器。
- 在 asyncProcessRequest() 方法中,SendMessageProcessor 会对请求进行更详细的解析,并决定如何处理该请求。如果请求是合法的消息发送请求,那么它会调用 DefaultMessageStore 的 asyncSendMessage() 方法来异步发送消息。
- DefaultMessageStore 是 RocketMQ 中负责消息存储的核心组件。在 asyncSendMessage() 方法中,它会创建一个新的线程或使用现有的线程池来异步处理消息发送请求。
- 在异步线程中,DefaultMessageStore 会调用自己的 asyncPutMessage() 方法来实际存储消息。
- 在 asyncPutMessage() 方法中,DefaultMessageStore 会将消息写入 CommitLog 文件,并更新相应的 ConsumeQueue 和 IndexFile 等索引信息。完成消息存储后,DefaultMessageStore 会返回一个结果给 SendMessageProcessor,表示消息已经成功存储或者存储过程中发生了错误。
2.1SendMessageProcessor
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 对请求头进行解析SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// todo 根据请求头走批量还是非批量if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// todo 非批量return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}
段代码是 RocketMQ Broker 中处理消息发送请求的核心逻辑之一,具体实现了 asyncProcessRequest 方法。
- ChannelHandlerContext ctx:表示当前的网络上下文,包含了与客户端通信的相关信息。
- RemotingCommand request:表示从客户端接收到的消息请求对象。
- 返回值:一个 CompletableFuture,用于异步处理请求并最终返回结果。
2.2 非批量消息 asyncSendMessage
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 1 包装一些信息给相应final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}// 2 得到剧具体消息final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();// 得到topic信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 请求消息 没有选择队列,就随机if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}// 构建存储到磁盘的类 ,消息内部保存信息MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());//topic信息msgInner.setQueueId(queueIdInt);// 队列id//todo 消息重试和死信队列if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);//具体消息字节码msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked laterorigProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// todo 是否是事务消息if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {// Broker如果不支持事务response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 一个异步方法,用于处理事务消息的预提交阶段。这个方法会将消息暂存到事务消息存储中,putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 完成对信息的落盘【异步】putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
这段代码是 RocketMQ Broker 中处理非批量消息发送的核心逻辑,具体实现了 asyncSendMessage 方法,它负责将客户端发送的消息存储到磁盘中,并返回结果给客户端。
大致流程:
这里构建了一个msgInner对象,用于异步刷盘 。此对象保存了
- Topic:消息所属的主题名称。
- Queue ID:消息所在的队列编号。
- Body:消息的实际内容,即消息体,以字节码形式存储。
- Flag:消息的标志位,包含消息的属性信息。
- Properties:用户自定义的消息属性,转换为Map格式,可以从MessageExtBrokerInner中提取出来或设置>进去。
- Born Timestamp:消息创建的时间戳。
- Born Host:消息产生的主机地址,即生产者客户端的网络地址。
- Store Host:消息存储的服务端地址,即Broker的地址。
- Reconsume Times:消息已经被重新消费的次数。
- Cluster Name:消息所在的集群名称。
- Transaction Flag(间接存储):通过检查用户自定义属性判断是否为事务消息。
最终根据是事务消息还是普通消息,决定把msgInner异步保存到事务消息存储中还是异步落盘,然后封装返回对象返回给producer
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());//topic信息
msgInner.setQueueId(queueIdInt);//todo 消息重试和死信队列if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
MessageAccessor.setProperties(msgInner, origProps);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
MessageExtBrokerInner
:
- 这是一个内部类,表示消息在 Broker 内部的存储格式。
- 设置消息的主题、队列 ID、消息体、标志位、属性、时间戳、生产者地址、Broker 地址等信息。
重试和死信队列
:
- 调用 handleRetryAndDLQ 方法处理消息重试和死信队列相关的逻辑。如果处理失败,则直接返回响应。
2.3 延迟消息
//非事务消息 或者 是已经提交事务的消息if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {/*** 延迟消息 并且 延迟等级要大于0*/// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 检查延时级别是否超过最大允许值,超过最大值就设置为最大值 18if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//修改主题为内置的延迟主题topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;//根据延时级别计算出新的队列IDqueueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId//根据延时级别计算出新的队列IDMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//更新消息的topic和queueId字段。msg.setTopic(topic);msg.setQueueId(queueId);}}
这段代码展示了 RocketMQ 中处理 非事务消息 或 已提交的事务消息 的逻辑,特别是针对 延迟消息 的特殊处理
流程
1、检查延时级别是否超过最大允许值,超过最大值就设置为最大值 18
2、修改主题为延迟主题为SCHEDULE_TOPIC_XXXX
3、根据延迟等级计算出新的队列id ;id=延迟等级-1
4、存储原始主题和队列ID到消息属性中,以便后续恢复。
5、更新消息的topic和queueId字段。
然后将其作为普通消息进行存储,追加到commitlog文件中中。
注意:
延迟级别的配置
: 默认情况下,RocketMQ 支持 18 个延迟级别,分别对应不同的时间间隔(如 1 秒、5 秒、10 秒等)。
延迟消息的可靠性
:延迟消息的可靠性依赖于 Broker 的调度能力。如果 Broker 出现故障,可能会导致延迟消息的投递时间不准确。
性能影响
:延迟消息的处理会增加 Broker 的调度负担,特别是在高并发场景下,需要合理设计延迟队列的数量和大小。
定时任务处理
public void start() {if (started.compareAndSet(false, true)) {super.load();//启动一个定时器 守护线程this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {//得到延迟Integer level = entry.getKey();//获取当前遍历到的延迟级别。Long timeDelay = entry.getValue();//获取当前延迟级别的延迟时间。//从offsetTable获取对应延迟级别的偏移量,若无则后续赋值为0。Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}//如果存在有效的延迟时间,则为每个延迟级别创建并调度一个DeliverDelayedMessageTimerTask任务,// 任务将在FIRST_DELAY_TIME=1000 毫秒后执行,开始检查并投递达到延迟时间的消息。if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}/*** 调度一个固定频率的定时任务,每隔 1000 * 10毫秒执行一次。* 在任务内部,如果服务仍处于启动状态,则调用ScheduleMessageService.this.persist()方法持久化服务状态。*/this.timer.scheduleAtFixedRate(new TimerTask() {/*** 在run方法内,尝试持久化服务状态,如果出现异常则记录错误日志。*/@Overridepublic void run() {try {if (started.get()) {// 持久化ScheduleMessageService.this.persist();}} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}
在定时器里,如果延迟时间不为null则为每个延迟级别创建并调度一个DeliverDelayedMessageTimerTask任务,任务将在1000 毫秒后执行,开始检查并投递达到延迟时间的消息。DeliverDelayedMessageTimerTask此类继承了TimerTask,下面是他的run方法
在DeliverDelayedMessageTimerTask中根据
SCHEDULE_TOPIC_XXXX
名称和延时等级对应的queueId获取消息队列,然后从commitlog中读取消息,还原消息的原有信息(消息的原topic信息)再将消息持久化到commitlog文件中,这样消费者就可以拉取消息了.
public void executeOnTimeup() {// 定位到特定延时级别的系统延时消息队列,以便后续从中读取和处理已到达投递时间的延时消息。ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore//查找或创建指定主题和队列ID的消费队列对象。如果消费队列不存在,则先创建一个并加入到映射表中,最后返回这个消费队列对象。.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;//初始偏移量if (cq != null) {//从ConsumeQueue(消费队列)中获取索引缓冲区。SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;//下一个偏移量int i = 0;//当需要从扩展地址获取更多有关消息的信息时,会使用此类实例来装载这些扩展数据,便于进一步解析和处理消息。ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//遍历缓冲区,读取每个消息的偏移量、大小及标签码(tagsCode)等元数据。long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {//如果消息具有扩展信息,则加载扩展内容到ConsumeQueueExt.CqExtUnit对象中。tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();//根据当前时间计算实际的投递时间戳。 返回的时间long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);//下一个偏移量nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown <= 0) {// 如果当前时间已经超过投递时间,则从CommitLog中查找并加载完整消息。MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {// 将原始的MessageExt对象转换成MessageExtBrokerInner对象MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}// 将处理过的延时消息重新存入消息存储系统(例如CommitLog)的过程。PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {// 消息写入成功if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {// 如果启用了延时消息统计,则进行一系列统计信息的更新:ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());}continue;} else {// 写入失败 容错机制 当重新投递延时消息失败时,不是立刻停止处理,// 而是记录错误日志并重新安排任务稍后再次尝试投递,确保延时消息能在合适的时间得到处理。// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);}}} else {// 还没到投递时间 则按剩余时间重新调度任务ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of for//处理延时消息队列中的所有消息之后或者在循环内遇到需要延迟投递的消息时,进行的任务调度和偏移量更新操作:nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {//这段代码的作用是校验提供的偏移量是否在当前消费队列(ConsumeQueue)的有效范围内long cqMinOffset = cq.getMinOffsetInQueue();long cqMaxOffset = cq.getMaxOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",offset, cqMinOffset, cqMaxOffset, cq.getQueueId());}if (offset > cqMaxOffset) {failScheduleOffset = cqMaxOffset;log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",offset, cqMinOffset, cqMaxOffset, cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}
- 定位延时消息队列:根据延时级别找到对应的系统延时消息队列(ConsumeQueue)。
- 初始化变量:设置初始偏移量(failScheduleOffset)和下一个待检查的偏移量(nextOffset)。
- 获取索引缓冲区:从延时消息队列中获取索引缓冲区,用于读取队列中的消息元数据。
- 循环处理消息:
○ 读取元数据:遍历缓冲区,读取每个消息的偏移量、大小及标签码(tagsCode)等元数据。
○ 处理扩展信息:如果消息具有扩展信息,则加载扩展内容到ConsumeQueueExt.CqExtUnit对象中。
○ 计算投递时间:根据当前时间计算实际的投递时间戳。
○ 判断消息是否可投递:如果当前时间已经超过投递时间,则从CommitLog中查找并加载完整消息。
○ 重新投递消息:若消息有效,则对其进行适当处理(如messageTimeup方法),并将其重新发布到目标主题。同时更新统计信息和指标,并根据发布结果决定是否需要重新调度任务。
○ 延期投递:若消息未达到投递时间,则按剩余延时时间重新调度任务。 - 循环结束后处理:循环结束后,如果没有更多立即可投递的消息,则根据当前偏移量设置下一次任务的触发时间。
- 错误处理:如果在处理过程中发现提供的偏移量超出延时消息队列的有效范围,则调整failScheduleOffset为队列的最小或最大有效偏移量,并记录错误日志。
- 最终调度:无论上述过程如何,在方法结束时,都会根据failScheduleOffset设置一个定时任务,确保即使出现异常也能继续检查延时消息队列中的其他消息。
总结
首先将延时消息换了一个topic名称进行持久化,这样消费者就无法获取消息,然后有定时任务,会将消息还原到原有的topic信息,这样消费者又可以重新拉取消息了。
2.4 事务消息接收
调用这个方法时,Broker会接收并持久化这个半事务消息,但并不会立即将其暴露给消费者,而是等待生产者后续提交或回滚事务状态的确认。只有当生产者通知Broker事务已经成功提交时,Broker才会将消息标记为可消费状态;反之,如果事务回滚,Broker则会丢弃这条消息,从而确保分布式事务的一致性。
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// 记录消息的新属性// 把消息的topic 记录到 REAL_TOPIC 的属性中MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());//把消息的 QueueId 记录到 REAL_QID 的属性中MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));// 更新系统标志位 为0 非事务类型msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));// 将消息的topic 设置为 RMQ_SYS_TRANS_HALF_TOPIC 半事务topicmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());// 设置队列idmsgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}
RocketMQ并非将事务消息保存至消息中 client 指定的 queue,而是记录了原始的 topic 和 queue 后,把这个事务消息保存在 - 特殊的内部 topic:RMQ_SYS_TRANS_HALF_TOPIC - 序号为 0 的 queue。这套 topic 和 queue 对消费者不可见,因此里面的消息也永远不会被消费。这就保证在事务提交成功之前,这个事务消息对 Consumer 是消费不到的。
如何事务反查
进入到sndCheckMessage()方法
/*** 【重点】处理 EndTransactionRequest 请求*/@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);...OperationResult result = new OperationResult();// 提交事务if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 从commitLog中查出原始的prepared消息,要求producer在发送半消息和comit消息都要同一个brokerresult = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查获取的消息与请求的消息是否匹配RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 将prepareMessage构建为要发送给consumer的消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());...// 调用MessageStore的消息存储接口提交消息,使用真正的topic和queueIdRemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// 将prepareMessage标记为deletethis.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 收到的是rollback,查出原始Prepare消息result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查获取的消息与请求的消息是否匹配RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 将prepareMessage标记为deletethis.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}...// 返回响应return response;}
如果接收到的是Commit,则将原本的half消息构建为普通消息,然后使用真正的topic和queueId将消息保存到store,这时可以被consumer消费;然后将原本的half消息标记为delete状态(这里因为对consumer不可见,无需撤销消息,且因为RocketMQ也无法真正的删除一条消息,因为是顺序写文件的)
如果是Rollback,则直接将half消息标记为delete返回响应
服务端在接收到ROLLBACK_MESSAGE的指令后,会根据事务消息的事务ID等信息找到对应的消息,并将其从CommitLog中清除。这里的清除实际上是指在后续的清理流程中,将包含回滚事务消息的文件段标记为可回收,待下次刷盘或清理时,这部分空间可以被重用,从而达到逻辑上的删除效果。
不论提交还是回滚,都将操作结果封装到响应命令response中,并返回给客户端。如果在处理事务过程中遇到错误,将错误码和错误原因填充到响应命令中
相关文章:
启动你的RocketMQ之旅(五)-Broker详细——消息传输
前言: 👏作者简介:我是笑霸final。 📝个人主页: 笑霸final的主页2 📕系列专栏:java专栏 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一…...
100个节点的部署,整合Docker、Kubernetes和Jenkins的详细设计
一、架构设计概览 组件角色: Docker:应用容器化封装,确保环境一致性。Kubernetes(K8s):自动化容器编排,管理多节点集群的调度、扩缩容和自愈。Jenkins:CI/CD流水线驱动,实现代码到生产的自动化流程。集群规模: Master节点:3个(高可用,避免单点故障)。Worker节点:…...
神经网络与计算机视觉
2016 年,随着 AlphaGo 在围棋比赛中击败李世石,“人工智能”、“神经网络”、“深度 学习”等字眼便越来越多的出现在大众眼前,智能化好像成为一种不可逆转的趋势,带给大家新奇感的同时也带来了一丝忧惧:在不远的未来,机器是否真的拥有思维和情感?《终结者》中天网大战人…...
openEuler对比CentOS的核心优势分析
openEuler对比CentOS的核心优势分析 在开源操作系统领域,openEuler与CentOS均占据重要地位,但随着CentOS维护策略的调整(如CentOS 8停止维护,转向CentOS Stream),越来越多的用户开始关注国产化替代方案。o…...
高性能电脑系统优化工具Advanced SystemCare PRO v18.3.0.240 解锁永久专业版
软件介绍 IObit Advanced SystemCare,系统清理维护与安全防护软件,大幅提升整体系统性能和安全!一键AI智能模式,全面扫描优化修复系统,拥有性能加速模式、系统优化、网络加速、启动项优化、软件更新、实时监视清理、隐…...
【C语言练习】005. 编写表达式并确定其值
【C语言练习】005. 编写表达式并确定其值 005. 编写表达式并确定其值示例 1:算术表达式计算过程:最终结果:示例 2:关系和逻辑表达式计算过程:最终结果:示例 3:复合赋值和算术表达式计算过程:最终结果:示例 4:位运算表达式计算过程:最终结果:示例 5:综合表达式计算…...
力扣面试150题--合并两个有序链表和随机链表的复制
Day 33 题目描述 思路 常规题目,比较list1和list2节点的值,取出较小值扩展链表,最后其中一个遍历完直接拼接另外一个即可(归并排序) /*** Definition for singly-linked list.* public class ListNode {* int v…...
测试用例的设计
组合原则:多个选项有效数据建议组合使用(正向功能)、单个选项无效数据组合其他选项有效数据使用(逆向功能) 一、针对登录模块设计测试用例: 1.账号:已注册手机号、已注册邮箱、为空、未注册手机号、未注册邮箱 2.密码:注册密码、为空、错误密码 3.验证码:正确、过期、错误 …...
关于TCP三次握手和四次挥手的疑点
参考文章:浅谈TCP三次握手和四次挥手 1、三次握手的作用 (1)确保双方收到对方的初始序列号:客户端发送SYN包,服务器回复SYN-ACK包,客户端再回复ACK包,确保双方都接收到对方的序列号。 …...
逆向|dy|a_bogus|1.0.1.19-fix.01
2025-04-26 请求地址:aHR0cHM6Ly93d3cuZG91eWluLmNvbS91c2VyL01TNHdMakFCQUFBQV96azV6NkoyMG1YeGt0eHBnNkkzRVRKejlyMEs3d2Y2dU9EWlhvd2ttblZWRnB0dlBPMmMwN2J0WFotcVU4V3M 个人主页的视频数据 我们需要逆向这个接口,所以现在需要分析这个请求, 分析这几个数据包可以发现: 只有…...
【软考-架构】13.5、中间件
✨资料&文章更新✨ GitHub地址:https://github.com/tyronczt/system_architect 文章目录 中间件技术典型应用架构 中间件技术 在分布式系统环境中,出于操作系统和应用程序之间的软件。 主要的中间件有五类: 数据库访问中间件࿱…...
【Redis】基础2:作为缓存
文章目录 1. 一些概念2. 主动更新策略/缓存设计模式2.1 cache-aside pattern(lazy loading)2.2 read-through pattern(针对读操作)2.3 write-through pattern2.4 write behind pattern(write back pattern)…...
豆包,Kim,deepseek对比
以下是豆包、Kimi、DeepSeek的对比与应用: 对比 - 核心技术:DeepSeek-R1完全依赖强化学习驱动,跳过监督微调阶段。Kimi k1.5采用“轻量级SFT预热RL优化”的混合策略。 - 多模态支持:Kimi k1.5支持文本与图像的多模态联合推理。De…...
L2-005 集合相似度
L2-005 集合相似度 - 团体程序设计天梯赛-练习集 给定两个整数集合,它们的相似度定义为:Nc/Nt100%。其中Nc是两个集合都有的不相等整数的个数,Nt是两个集合一共有的不相等整数的个数。你的任务就是计算任意一对给定集合的相似度。 …...
33.状态压缩动态规划
一、算法内容 1.简介 若元素数量比较小(不超过 20 20 20)时,想要存储每个元素取或不取的状态时, 可以借助位运算将状态压缩。 需要借助状态压缩过程的动态规划就是状态压缩 DP(很多地方会简称为状压 DP)…...
WSL 中 nvidia-smi: command not found的解决办法
前言 在使用基于 Linux 的 Windows 子系统(WSL)时,当我们执行某些操作后,可能会遇到输入 nvidia-smi 命令却无法被系统识别的情况。 例如,在终端中输入nvidia-smi 后,系统返回提示 -bash: nvidia-smi: co…...
Linux 进程控制
文章目录 1. 进程创建1.1 fork1.2 写时拷贝 2.进程终止2.1 退出码2.2 进程如何返回退出码 3. 进程等待3.1 wait3.1.1 阻塞等待3.1.2 退出码与退出信号 3.2 waitpid 1. 进程创建 1.1 fork 我们可以使用fork函数来创建子进程,创建子进程后,父子进程之间就…...
使用MobaXterm远程登录Ubuntu系统:SSH服务配置教程
一、MobaXterm介绍 MobaXterm官网:https://mobaxterm.mobatek.net/ MobaXterm类似于Xshell,是一个工具箱,功能比Xshell多。 直接去官网下载安装就可以,本文主要介绍开启Ubuntu的ssh服务,并通过MobaXterm实现远程登录…...
直线模组精度测试的标准是什么?
直线模组的精度测试是确保其性能和稳定性的重要环节。那么,大家知道直线模组精度测试的标准是什么吗? 1、定位精度:以最大行程为基准长度,用从基准位置开始实际移动的距离与指令值之间的最大误差的绝对值来表示。一般来说…...
RK3568 Debian调试记录
文章目录 1、环境介绍2、前言3、debian目录结构3.1、脚本调用顺序 4、编译debian4.1、构建debian编译所需的环境4.2、编译debian4.3、打包 5、系统启动6、debian适配6.1、新增板级配置单6.2、USB6.3、Wi-Fi / BT6.4、屏幕旋转6.5、触摸旋转6.6、时钟 7、测试8、总结 1、环境介绍…...
来自 Bisheng 关于微调的内容总结
来自 Bisheng 关于微调的内容总结 0. 引言1. 关于微调的总结 0. 引言 这篇文章的内容(主要是截图)是来自 Bisheng 关于微调的内容总结,内容来源于 B 站Up主七吟覃_BISHENG负责人的视频,感兴趣的可以观看原视频。 1. 关于微调的总…...
Git 工具的安装
目录 Git 工具介绍 Git 工具安装 创建本地仓库 配置本地仓库 Git 版本控制基本原理 本期开始,我们将学习如何使用 Git 工具,实现多版本控制。 Git 工具介绍 要了解 Git 工具我们得先了解版本控制器的概念。 有这样一个场景,如下图所…...
任务管理系统,Java+Vue,含源码与文档,科学规划任务节点,全程督办保障项目落地提效
前言: 在当今快节奏的工作环境中,高效的任务管理是确保项目按时完成、资源合理分配及团队协作顺畅的关键。任务管理系统作为提升工作效率的重要工具,通过数字化手段帮助用户组织、跟踪和完成各类任务。本文将详细阐述任务管理系统的五大核心…...
JavaScript基础知识合集笔记1——数据类型
文章目录 JavaScript中的数据类型基本数据类型引用类型存储区别 JavaScript中的数据类型 基本数据类型和复杂类型 基本数据类型 基础类型包含六种:Number、Bigint、String、Boolean、Undefined、null、symbol Number(特殊值NaN,意为“不是数值”) c…...
2025.04.26-美团春招笔试题-第四题
📌 点击直达笔试专栏 👉《大厂笔试突围》 💻 春秋招笔试突围在线OJ 👉 笔试突围OJ 04. 图像智能降维处理 问题描述 卢小姐是一家图像处理公司的算法工程师,她正在开发一种高效的图像压缩算法。该算法基于奇异值分解(SVD)技术,通过保留图像矩阵中最重要的特征,在…...
测试基础笔记第十三天
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 一、流程控制语句-判断语句1.判断语句2.逻辑运算符3.elif多重判断4.if的嵌套5.if与逻辑运算符结合1.and2.or3.not 设计测试用例二、流程控制语句-循环语句1.while语…...
【昇腾】PaddleOCR转om推理
文章目录 1. 使用Paddle框架推理1.1 安装1.2 推理 2. paddle 转 ONNX3. 转om4. Ais_bench 命令推理5. Ais_bench 编写推理代码 概要: PyTorch官方提供了昇腾插件包,安装后虽然可以支持PytorchOCR和PaddlePaddle的推理任务,但性能通常低于GPU。…...
【数据融合】基于拓展卡尔曼滤波实现雷达与红外的异步融合附matlab代码
一、问题分析与技术难点 1. 传感器特性对比 传感器测量维度优势局限性噪声模型雷达距离 $ r $、方位角 $ \theta $、速度 $ v $测距精度高、全天候工作角度分辨率低、易受多径干扰高斯噪声,协方差矩阵 $ R_r \text{diag}(\sigma_r^2, \sigma_\theta^2, \sigma_v^…...
第一部分:网页的骨架 —— HTML
这目录 前言1. 初识 HTML:搭建地基和框架1.1 小例子: 创建一个最简单的 HTML 页面,包含 "Hello World"。1.2 练习 2. 常用文本与内容标签:填充墙体和房间2.1 小例子: 创建一个包含个人简介(使用标…...
RTMP 协议解析 1
介绍 📖 什么是 RTMP? RTMP协议(Real-Time Messaging Protocol,实时消息传输协议)是由Adobe公司(最初由Macromedia开发)设计的一种用于实时传输音频、视频和数据流的网络协议,主要…...
c++初始化数组
1.前言 话说数组是n年前的事了,我为啥现在又提到它呢?因为很多人不会初始化数组,所以今天我来教教大家 2.初始化数组 初始化数组就是定义数组,就像这样 int a[5]{0}; 这样是a[0]到a[5]全都等于0 如果要输出这个数组…...
支持Win和Mac的批量图片压缩方法
软件介绍 如果你的图片太大,传输或上传总是卡壳,那就需要一款好用的图片压缩工具了。今天推荐的这款工具支持Windows和Mac双系统,简直是图片压缩界的"变形金刚"! 图压(图片压缩双系统版) …...
autodl(linux)环境下载git-lfs等工具及使用
一、git-lfs工具下载 #初始化git.lfs命令 curl -s https://packagecloud.io/install/repositories/github/git-lfs/script.deb.sh | sudo bash sudo apt-get install git-lfs git lfs install 二、 huggingface-cli工具下载及使用 Linux设置huggingface的镜像: ex…...
云原生--核心组件-容器篇-3-Docker核心之-镜像
1、定义与作用 定义: Docker镜像是一个只读的模板,包含运行应用程序所需的所有内容,包括代码、依赖库、环境变量、配置文件等。简单来说,Docker镜像是一个轻量级、独立、可执行的软件包,它包含了运行某个软件所需的所有…...
Dify与n8n深度对比:AI应用开发与自动化工作流的双轨选择
Dify与n8n深度对比:AI应用开发与自动化工作流的双轨选择 在数字化转型加速的2025年,Dify和n8n作为两大主流工具,分别代表了AI应用开发与自动化工作流领域的顶尖解决方案。本文将从核心定位、功能特性、使用场景等维度展开对比,为…...
AI算法优化建筑形态与能耗管理 实现方案和技术架构
以下是基于AI算法优化建筑形态与能耗管理的实现方案与技术架构,结合行业实践与前沿技术趋势,分层次解析核心要素及实施路径: 一、技术架构设计 1. 数据采集与感知层 多源数据融合 传感器网络:部署温湿度、CO₂浓度、光照、人流密度等传感器,构建实时数据采集体系(如北京…...
【互联网架构解析】从物理层到应用层的全栈组成
目录 前言技术背景与价值当前技术痛点解决方案概述目标读者说明 一、技术原理剖析核心概念图解核心作用讲解关键技术模块说明技术选型对比 二、实战演示环境配置要求核心代码实现(Python网络请求)运行结果验证 三、性能对比测试方法论量化数据对比结果分…...
Redis和MQ的区别
redis是一个高性能的key-value数据库,支持消息推送功能,可以当做一个轻量级的队列服务器使用。 redis只是提供一个高性能的、原子操作内存键值队,具有高速访问能力,虽然可以做消息队列的存储,但不具备消息队列的任何功…...
多系统安装经验,移动硬盘,ubuntu grub修改/etc/fstab 移动硬盘需要改成nfts格式才能放steam游戏
笔记本一个系统,移动硬盘两个系统,当前系统sda4.jpg 移动硬盘需要再装一个linux会有boot/efi,启动的时候grub界面才能识别,单linux没有efi别的电脑识别不到 没efi甚至启动不了grub 按下f6.jpg 看看笔记本grub能不能识别得到移动硬…...
4.26学习——web刷题
把攻防世界的web做了20道左右,挑了几道学到东西的题目记录一下 攻防世界warmup 进到环境中读取源代码发先有个提示:source.php,进去看看 <?phphighlight_file(__FILE__);class emmm{public static function checkFile(&$page){$wh…...
Go 语言中的实时交互式编程环境
在 Go 语言中,确实有几种方法可以实现类似 Python REPL 的实时交互式编程体验,让你可以实时编写代码并查看输出,而无需每次都编译运行整个程序。 但是需要注意的是,由于 Go 是编译型语言,完全的实时交互体验不如解释型…...
动态规划求解leetcode300.最长递增子序列(LIS)详解
给你一个整数数组 nums ,找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列,删除(或不删除)数组中的元素而不改变其余元素的顺序。例如,[3,6,2,7] 是数组 [0,3,1,6,2,2,7] 的子序列。 示例 1&#…...
冯·诺依曼和哈佛架构两种架构的总线组成及核心特点
在计算机体系结构中,哈佛架构和冯诺依曼架构是两种不同的存储与总线设计范式,它们的总线组成和访问方式有显著差异。以下是两种架构的总线组成及核心特点的详细分析: 1. 冯诺依曼架构(Von Neumann Architecture) 核心…...
7.学习笔记-Maven进阶(P75-P89)-进度(p75-P80)
1.MAVEN-01-分模块开发的意义 (一)分模块开发意义 模块可以按功能划分,也可以按团队划分,所以把domain的方法抽取出来,进行共享,从而提高开发 的效率。 (1)分模块开发的意义…...
Java——令牌技术
目录 一、何为令牌 JWT令牌 介绍 JWT组成 二、JWT用于验证用户登录 三、JWT令牌生成和校验 简单用法 1.创建生成密钥的方法 2.接着添加过期时间,密钥,BASE64解码密钥的属性以及生成token的方法,合并上面生成密钥的方法,下面…...
【含文档+PPT+源码】基于Python校园跑腿管理系统设计与实现
项目介绍 本课程演示的是一款基于Python校园跑腿管理系统设计与实现,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Python学习者。 1.包含:项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 3.…...
Spring AI Alibaba - Milvus 初体验,实现知识库效果
先看效果 数据被存储在 milvus 中,包括原始数据和向量数据。 大模型使用向量化数据的回答: 环境准备 安装 milvus Milvus 是一款专为向量相似性搜索设计的高性能开源数据库。 本地测试环境可以直接 Standalone 模式安装,需要用到 docke…...
arcpy列表函数的应用
arcpy.ListDatasets() 该函数用于列出指定工作空间中的所有数据集(如要素数据集、栅格数据集等)。 语法: python arcpy.ListDatasets(wild_cardNone, feature_typeNone) • wild_card:用于筛选数据集名称的通配符。 • feat…...
上位机知识篇---时钟分频
文章目录 前言 前言 本文简单介绍了一下时钟分频。时钟分频(Clock Division)是数字电路设计中常见的技术,用于将高频时钟信号转换为较低频率的时钟信号,以满足不同模块的时序需求。它在处理器、FPGA、SoC(片上系统&am…...
Redis的两种持久化方式:RDB和AOF
Redis持久化概述 Redis作为内存数据库,数据存储在内存中。为了保证数据在服务器重启或宕机时不丢失,Redis提供了两种持久化方案: RDB(Redis Database):定时生成内存快照 AOF(Append Only File&…...