RocketMQ源码分析之事务消息分析
rocketMQ事务消息原理概述
RocketMQ采用两阶段提交(2PC)的思想来实现事务消息,当事务消息失败或者超时,同时采用补偿的方式处理这个问题。这两个阶段分别为正常事务消息的发送与提交以及事务消息的补偿。我们看看官方文档给的事务消息的流程图:
1、事务消息的发送与提交
MQ Producer 将事务消息发送给MQ Server(Broker 服务器),这时的消息称为半消息,半消息是不能被消费者消费的。当MQ Server成功接收到MQ Producer发送的半消息,就会给MQ Producer返回ack确认消息,告诉MQ Producer半消息是否成功接收到。如果半消息发送成功,就可以执行本地事务了,这个本地事务一般是数据库事务,否则就不执行本地事务。本地事务执行以后,MQ Server根据本地事务的执行状态执行半消息的提交或者回滚,当本地事务执行成功时,半消息被提交变成正常的消息,能够被消费者消息,当本地事务执行失败时,半消息就会被删除。
2、事务消息的补偿
当本地事务执行以后,MQ Producer会将本地事务的执行状态告诉MQ Server,即上图4过程,但是如果这个过程的请求如果失败或者超时了,MQ Server并不知道本地事务的状态,所以MQ Server会发送消息告诉MQ Procuer回查一次本地事务的状态,MQ Procuer回查本地事务的状态以后告知MQ Server,从而决定半消息是提交还是回滚。这个回查的逻辑为业务方实现,告知本地事务的执行结果。
这就是RocketMQ的事务消息原理,通过两阶段提交实现,采用补偿的方式达到数据的最终一致性。
这里有一个点需要注意,就是半消息是如何做到对消费者不可见的?
如果让我们来做,我们可能会将消息先放在一个消息者不能消费的地方,消息者是根据topic消费的,只要将消息放到消费者没有订阅的topic的队列中,这样消费者就不能消费消息了。当本地事务成功执行以后,半消息要变成可以被消费者消费的消息,那么将消息放回原本topic的消费队列就可以了。
其实RocketMQ让半消息对消费者不可见的做法也是这样的,如果是半消息,将消息的topic替换成RMQ_SYS_TRANS_HALF_TOPIC,由于消费者未订阅该主题,所以就实现了半消息对消费者不可见。
RocketMQ事务消息使用
讲完了RocketMQ的原理,我们接下看看RocketMQ事务消息的使用:
public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {//事务的监听器TransactionListener transactionListener = new TransactionListenerImpl();//创建事务生产者TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");//回查线程ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});//设置回查线程以及事务监听器producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};//发送消息for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送事务消息SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}
}
首先创建事务监听器TransactionListenerImpl、事务生产者TransactionMQProducer以及回查线程,然后设置事务监听器以及回查线程、启动生产者,最后使用sendMessageInTransaction方法发送事务消息了。这里设置事务监听器,事务监听器实现了TransactionListener接口:
public interface TransactionListener {/*** 当发送事务半消息成功,该方法将会执行本地事务*/LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);/*** 当没有收到半消息的响应,broker将会发送回查消息检测事务的状态,以及该方法将会获取本地事务的状态*/LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
TransactionListener有executeLocalTransaction和checkLocalTransaction方法,executeLocalTransaction方法是当发送事务半消息成功,该方法方法用来执行本地事务,checkLocalTransaction当没有收到发送半消息的响应,broker将会通过该方法回查本地事务的状态,从而决定半消息是提交还是回滚。
RocketMQ事务消息源码分析
分析了RocketMQ的事务消息原理以及RoketMQ事务消息的使用,接下来深入源码分析下面几个问题:
- 半消息如何做到对消费者不可见
- 本地事务什么时候执行,即executeLocalTransaction方法什么时候执行
- 半消息的提交以及回滚
- Broker什么时候触发checkLocalTransaction回查方法
半消息如何做到对消费者不可见
RocketMQ事务消息采用的是同步的发送方法发送事务消息的,由于消息的发送已经在RocketMQ源码之生产者发送消息分析中已经讲过,这里就不讲了。当Broker接收到半消息以后,将会解析半消息,方法如下:
//代码位置: org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {//REAL_TOPIC 真实的topicMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());//REAL_QID 真实的QueueIdMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));//重新设置topic和QueueIdmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;
}
parseHalfMessageInner将消息的真实topic和真实的QueueId保存在Property属性中,然后重新设置topic和QueueId,topic设置为RMQ_SYS_TRANS_HALF_TOPIC,QueueId设置为0,buildHalfTopic方法就是获取RMQ_SYS_TRANS_HALF_TOPIC,如下:
//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil#buildHalfTopic
public static String buildHalfTopic() {return MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
}
上述将半消息的topic替换为RMQ_SYS_TRANS_HALF_TOPIC,因为消费者没有订阅RMQ_SYS_TRANS_HALF_TOPIC,所以半消息对消费者不可见,这就是半消息对消费者不可见的源码分析。
本地事务什么时候执行
当半消息发送以后,就等待消息发送的结果,然后就调用本地事务执行方法executeLocalTransaction方法:
//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,//省略代码SendResult sendResult = null;//设置事务标致MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {//发送消息sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;//根据发送状态 switch (sendResult.getSendStatus()) {case SEND_OK: {try {//省略代码if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");//执行executeLocalTransaction方法localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE://其他状态则回滚消息localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {//本地事务执行以后,告知broker服务器,半消息是提交还是回滚this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}//省略代码
}
sendMessageInTransaction方法发送事务消息,当发送以后会得到一个发送结果sendResult,根据发送结果的状态决定是否执行本地事务,当发送状态为SEND_OK时,执行本地事务方法executeLocalTransaction,并得到本地事务执行状态localTransactionState;当发送结果的状态是其他状态时,localTransactionState设置回滚消息,最后调用endTransaction方法通知Broker服务器,告知本地事务的执行状态,Broker服务器根据本地事务执行状态决定半消息是提交还是回滚。
半消息的提交以及回滚
当本地事务执行完成以后,就会告知Broker服务器本地事务执行的状态,调用的方法就是endTransaction,如下:
//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
public void endTransaction(final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;//设置事务idif (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}String transactionId = sendResult.getTransactionId();//根据broker名字查找broker地址final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());//事务结束请求头EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());//设置提交或者回滚标志switch (localTransactionState) {case COMMIT_MESSAGE: //提交消息requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE: //回滚消息requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW: //不知道requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;//调用单向的发送方法发送事务结束请求this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());
}
endTransaction方法首先根据broker名字查找broker地址,然后再进行封装事务结束请求头,设置提交还是回滚的标志,然后调用单向的发送方法发送事务结束请求。当Broker服务器接收事务结束请求,将会调用processRequest处理请求:
//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);//解析事务结束请求头final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);//如果是broker角色是SLAVE,则返回禁止结束事务的响应if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}//如果是事务回查if (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {//不是事务类型,打印告警信息,返回nullcase MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}//事务提交case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}//事务回滚case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {switch (requestHeader.getCommitOrRollback()) {//不是事务类型case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}//事务回滚case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}//事务回滚case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}//省略代码:做提交或者回滚操作
}
processRequest方法首先解析事务结束请求头,如果该broker角色是SLAVE,则返回禁止结束事务的响应。然后判断事务结束请求头中的回查标志,正常的事务结束请求的回查标志为false,根据事务结束请求头的提交回滚标志做不同的逻辑,这里提交和回滚其实并没有什么逻辑,打印日志或者什么都不做就退出switch,接下来则需要继续走提交和回滚的操作,代码如下:
//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {//代码省略//操作结果OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {//从commitLog中读出half消息result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);//提交成功if (result.getResponseCode() == ResponseCode.SUCCESS) {//检查半消息是否合法RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {//将半消息转成真实的原topic消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);//将消息写入commitLogRemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {//real消息持久化以后,删除半消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {//回滚消息result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);//回滚成功,删除半消息if (res.getCode() == ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;
}
如果是事务类型是提交类型,则提交消息,消息提交成功以后,则检查半消息是否合法,如果合法,则把半消息还原为原来的消息,即将topic和queueId替换为真实的topic和queueId,最后将消息落盘持久化保存起来,持久化成功以后还需要将原来的半消息删除掉。因为消息的topic已经被替换成真实的topic,则消费者就可以消费此消息了;如果是事务回滚类型,则回滚消息,回滚消息成功以后删除半消息。endMessageTransaction方法里面替换了真实的topic和queueId,代码如下:
//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#endMessageTransaction
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {MessageExtBrokerInner msgInner = new MessageExtBrokerInner();//替换为消息的真实topic和queueIdmsgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));//代码省略}
Broker什么时候触发checkLocalTransaction回查方法
上述的分析是正常的事务消息的提交以及回滚,当Broker不能确定本地事务执行状态时,需要依靠回查确定本地事务状态确定消息提交还是回滚。Broker在启动的时候,会创建并启动事务回查服务TransactionalMessageCheckService线程,TransactionalMessageCheckService服务会每分钟进行回查。代码如下:
//代码位置:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run
public void run() {log.info("Start transaction check service thread!");//事务检测间隔long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();while (!this.isStopped()) {this.waitForRunning(checkInterval);}log.info("End transaction check service thread!");
}
waitForRunning方法又调用onWaitEnd方法进行回查操作,代码如下:
//代码位置:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
protected void onWaitEnd() {//超时时间6秒long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();//最大回查次数15int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();long begin = System.currentTimeMillis();log.info("Begin to check prepare message, begin time:{}", begin);this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
从onWaitEnd方法可知,回查的超时时间为6秒,最大的回查次数为15秒,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。然后调用check方法回查,check方法最终会调用AbstractTransactionalMessageCheckListener的sendCheckMessage方法给生产者发送回查本地事务执行状态的方法,如下代码所示:
//代码位置:org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#sendCheckMessage
public void sendCheckMessage(MessageExt msgExt) throws Exception {//回查事务状态请求头CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());//替换成真实的topic和queueIdmsgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));//REAL_QIDmsgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));msgExt.setStoreSize(0);//PGROUPString groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);//获取可用的连接Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);if (channel != null) {//发送回查请求brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);} else {LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);}
}
sendCheckMessage方法首先构建回查事务状态请求头,将消息的topic和queueId替换成真实的topic和queueId,最后调用checkProducerTransactionState给生产者发送回查请求,请求码为CHECK_TRANSACTION_STATE(39)。这就是Broker给生产者发送回查请求的过程分析,接下来将分析当生产者接受到回查请求如何处理。
//代码位置:org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState
//检查本地事务状态
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {//检查事务状态请求头final CheckTransactionStateRequestHeader requestHeader =(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());//解码消息体final MessageExt messageExt = MessageDecoder.decode(byteBuffer);if (messageExt != null) {if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));}//事务idString transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {//通过group选择内部生产者MQProducerInner producer = this.mqClientFactory.selectProducer(group);if (producer != null) {//连接地址final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());//会查本地事务状态producer.checkTransactionState(addr, messageExt, requestHeader);} else {log.debug("checkTransactionState, pick producer by group[{}] failed", group);}} else {log.warn("checkTransactionState, pick producer group failed");}} else {log.warn("checkTransactionState, decode message failed");}return null;
}
生产者收到Broker的回查本地事务的请求,会将请求交给上述的checkTransactionState方法处理。当接收到请求时,首先解析本地事务请求头,通过生产者组group找到生产者producer,然后执行生产者producer的checkTransactionState方法回查本地事务的状态,生产者producer的checkTransactionState方法如下:
//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState
public void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {Runnable request = //代码省略this.checkExecutor.submit(request);}
checkTransactionState方法首先创建Runnable,上述将创建Runnable的代码省略了,在下面将会具体讲解这部分代码。创建好Runnable以后,就交给线程池处理。接下来看看创建Runnable的代码:
Runnable request = new Runnable() {private final String brokerAddr = addr;private final MessageExt message = msg;private final CheckTransactionStateRequestHeader checkRequestHeader = header;private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();@Overridepublic void run() {//获取事务回查监听接口TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();TransactionListener transactionListener = getCheckListener();//如果事务回查监听接口不为空if (transactionCheckListener != null || transactionListener != null) {LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable exception = null;try {if (transactionCheckListener != null) {//回查本地事务的状态localTransactionState = transactionCheckListener.checkLocalTransactionState(message);} else if (transactionListener != null) {log.debug("Used new check API in transaction message");localTransactionState = transactionListener.checkLocalTransaction(message);} else {log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);}} catch (Throwable e) {log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);exception = e;}this.processTransactionState(localTransactionState,group,exception);} else {log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}}//处理本地事务状态,给Broker发送本地事务状态private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {//创建结束事务请求头final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());//根据本地不同的事务状态进行设置事务的类型switch (localTransactionState) {case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}
};
Runnable类中有两个方法,一个是run,这个方法就是具体执行回查本地事务状态,processTransactionState方法就是将回查到本地事务的执行状态告知Broker服务器,run方法会调用processTransactionState方法。
run方法首先会获取本地事务回查接口TransactionCheckListener和TransactionListener,TransactionCheckListener是比较老的回查本地事务接口,已经废弃了,TransactionListener是比较新的接口。当TransactionCheckListener接口不等于null时,则调用TransactionCheckListener接口的checkLocalTransactionState方法回查本地事务状态,当TransactionListener不等于null时,则调用TransactionListener接口的checkLocalTransaction方法回查本地事务状态。
回查完本地事务状态以后,将本地事务状态作为参数传给processTransactionState方法,processTransactionState方法创建事务结束请求头,根据本地事务状态设置事务类型,然后调用endTransactionOneway方法告知Broker服务器,Broker服务器接收到事务结束请求,就交给上述processRequest方法处理,processRequest处理事务结束请求的逻辑已经分析过,这里就不分析了。
相关文章:
RocketMQ源码分析之事务消息分析
rocketMQ事务消息原理概述 RocketMQ采用两阶段提交(2PC)的思想来实现事务消息,当事务消息失败或者超时,同时采用补偿的方式处理这个问题。这两个阶段分别为正常事务消息的发送与提交以及事务消息的补偿。我们看看官方文档给的事务…...
2025.1.19机器学习笔记:PINN文献精读
第三十周周报 一、文献阅读题目信息摘要Abstract创新点物理背景网络框架实验实验一:直道稳定流条件实验二:环状网络中的非稳定流条件 结论缺点及展望 二、代码实践总结 一、文献阅读 题目信息 题目:《Enhanced physics-informed neural net…...
大文件上传服务-后端V1V2
文章目录 大文件上传概述:minio分布式文件存储使用的一些技术校验MD5的逻辑 uploadV1 版本 1uploadv2 版本 2 大文件上传概述: 之前项目做了一个文件上传的功能,最近看到有面试会具体的问这个上传功能的细节,把之前做的项目拿过来总结一下,自己写的一个…...
docker 基础语法学习,K8s基础语法学习,零基础学习
下面是关于Docker和Kubernetes的基础语法学习资料,包括一些关键概念和示例代码。 Docker 基础语法 1. 安装 Docker 首先,你需要安装 Docker。以下是不同操作系统上的安装指南: Windows/Mac: 下载并安装 Docker Desktop。 Linux: 根据你的…...
【网络协议】RFC3164-The BSD syslog Protocol
引言 Syslog常被称为系统日志或系统记录,是一种标准化的协议,用于网络设备、服务器和应用程序向中央Syslog服务器发送日志消息。互联网工程任务组(IETF)发布的RFC 3164,专门定义了BSD Syslog协议的规范和实现方式。通…...
MongoDB深度解析与实践案例
MongoDB深度解析与实践案例 在当今大数据与云计算盛行的时代,NoSQL数据库以其灵活的数据模型、水平扩展能力和高性能,成为处理海量数据的重要工具之一。MongoDB,作为NoSQL数据库的杰出代表,凭借其面向文档的存储结构、强大的查询语言以及丰富的生态系统,赢得了众多开发者…...
C语言从零到精通:常用运算符完全指南,掌握算术、逻辑与关系运算
系列文章目录 01-C语言从零到精通:常用运算符完全指南,掌握算术、逻辑与关系运算 文章目录 系列文章目录前言一、C语言的起源与应用领域1.1 C语言的起源1.2 C语言的应用领域1.2.1 操作系统开发1.2.2 嵌入式系统1.2.3 编译器开发1.2.4 游戏开发与图形处理…...
ArkUI概述
鸿蒙操作系统(HarmonyOS)是华为公司推出的一款面向未来、面向全场景的分布式操作系统。它不仅能够支持各种不同的设备,从手机、平板到智能穿戴和智能家居产品,而且为开发者提供了一套统一的开发环境和工具链。对于想要深入鸿蒙开发…...
浅谈计算机网络03 | 现代网络组成
现代网络组成 一 、网络生态体系1.1网络生态系统的多元主体1.2 网络接入设施的多样类型 二、现代网络的典型体系结构解析三、高速网络技术3.1 以太网技术3.2 Wi-Fi技术的深度剖析3.2.1 应用场景的多元覆盖3.2.2 标准升级与性能提升 3.3 4G/5G蜂窝网的技术演进3.3.1 蜂窝技术的代…...
在线图片马赛克处理工具
在线图片马赛克处理工具,无需登录,无需费用,用完就走。 包括中文和英文版本 官网地址: https://mosaic.openai2025.com...
文件上传 分片上传
分片上传则是将一个大文件分割成多个小块分别上传,最后再由服务器合并成完整的文件。这种做法的好处是可以并行处理多个小文件,提高上传效率;同时,如果某一部分上传失败,只需要重传这一部分,不影响其他部分…...
网络安全---CMS指纹信息实战
CMS简介 CMS(Content Management System)指的是内容管理系统,如WordPress、Joomla等。CMS系统非常常见,几乎所有大型网站都使用CMS来管理其网站的内容。由于常见CMS的漏洞较多,因此黑客将不断尝试利用这些漏洞攻击CMS…...
Ubuntu 24.04 LTS 系统语言英文改中文
Ubuntu 24.04 LTS 修改软件源 Ubuntu 更改软件源 修改语言 无需输入命令,为Ubuntu 24.04系统添加中文智能拼音输入法 在 setting 的 system 中按下图操作 点击“Apply Changes”。需要管理员密码,安装完成后,退出登录,重新登…...
信创在医疗领域的应用:开启医疗信息化新时代
信创在医疗领域的应用:开启医疗信息化新时代 信创在医疗领域的应用:开启医疗信息化新时代信创医疗自助一体机杭医基于信创底座的健康医疗大数据平台厦门大学附属成功医院基于海光CPU的信创改造中科可控基于海光CPU的智慧医疗解决方案 信创在医疗领域的应…...
力扣-数组-303 区域和检索-数组不可变
解析 题目有点费解,大致应该是给出区间内的和,然后维护一个前缀和,为了防止越界,先填一个0进去,在构建的时候也要注意此时构建的dp的下标是i1,所以加的前缀和的下标是i。 代码 class NumArray { public:…...
【CSS】---- CSS 实现超过固定高度后出现展开折叠按钮
1. 实现效果 2. 实现方法 使用 JS 获取盒子的高度,来添加对应的按钮和样式;使用 CSS 的浮动效果,参考CSS 实现超过固定高度后出现展开折叠按钮;使用容器查询 – container 语法;使用 clamp 函数进行样式判断。 3. 优…...
二十项零信任相关的前沿和趋势性技术-MASQUE
影响力评级:较低 市场渗透率:不到目标受众的 1% 成熟度:孵化 定义:基于QUIC加密的多路复用应用程序底层 (MASQUE) 是一个 IETF 标准草案,可实现流量的安全传输和代理。 MASQUE全称为:Multiplexed Appli…...
【Docker】使用Dev Container进行开发
工作区 Dev Container 设置 新建一个文件夹 ./devcontainer 然后下面放 devcontainer.json 然后安装 vscode dev container 插件,然后 CtrlShiftP 启动 Container {"name": "PyTorch-Julia Development","image": "x66ccff/p…...
搭建一个基于Spring Boot的数码分享网站
搭建一个基于Spring Boot的数码分享网站可以涵盖多个功能模块,例如用户管理、数码产品分享、评论、点赞、收藏、搜索等。以下是一个简化的步骤指南,帮助你快速搭建一个基础的数码分享平台。 — 1. 项目初始化 使用 Spring Initializr 生成一个Spring …...
在线json格式化工具
在线json格式化工具,包括中文和英文版本,无需登录,无需费用,用完就走。 官网地址: https://json.openai2025.com 效果如下:...
leetcode300.最长递增子序列
给你一个整数数组 nums ,找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列,删除(或不删除)数组中的元素而不改变其余元素的顺序。例如,[3,6,2,7] 是数组 [0,3,1,6,2,2,7] 的子序列。 示例 1&…...
【个人学习记录】软件开发生命周期(SDLC)是什么?
软件开发生命周期(Software Development Life Cycle,SDLC)是一个用于规划、创建、测试和部署信息系统的结构化过程。它包含以下主要阶段: 需求分析(Requirements Analysis) 收集并分析用户需求定义系统目标…...
CTE与临时表:优劣势对比及使用场景分析
在数据库开发中,尤其是在复杂查询和优化中,**公共表表达式(CTE)和临时表(Temporary Table)**是两种常用的工具。尽管它们的功能有些相似,都是为了处理中间结果集,但它们的优劣势和使…...
Kali环境变量技巧(The Environment Variable Technique Used by Kali
Kali环境变量技巧 朋友们好,我们今天继续更新《黑客视角下的Kali Linux的基础与网络管理》中的管理用户环境变量。为了充分利用我们的黑客操作系统Kali Linux,我们需要理解和善于使用环境变量,这样会使我们的工具更具便利,甚至具…...
Ubuntu 24.04 LTS linux 文件权限
Ubuntu 24.04 LTS 文件权限 读权限 :允许查看文件的内容。写权限 (w):允许修改文件的内容。执行权限 (x):允许执行文件(对于目录来说,是进入目录的权限)。 文件权限通常与三类用户相关联: 文…...
多个版本JAVA切换(学习笔记)
多个版本JAVA切换 很多时候,我们电脑上会安装多个版本的java版本,java8,java11,java17等等,这时候如果想要切换java的版本,可以按照以下方式进行 1.检查当前版本的JAVA 同时按下 win r 可以调出运行工具…...
AI刷题-最小替换子串长度、Bytedance Tree 问题
目录 一、最小替换子串长度 问题描述 输入格式 输出格式 输入样例 1 输出样例 1 输入样例 2 输出样例 2 解题思路: 问题理解 数据结构选择 算法步骤 最终代码: 运行结果: 二、Bytedance Tree 问题 问题描述 输入格式 输…...
Android 项目依赖冲突问题:Duplicate class found in modules
问题描述与处理处理 1、问题描述 plugins {id com.android.application }android {compileSdk 34defaultConfig {applicationId "com.my.dialog"minSdk 21targetSdk 34versionCode 1versionName "1.0"testInstrumentationRunner "androidx.test.run…...
Webpack简述
一、为什么要构建工具 人类喜欢书写的代码以及开发方式计算机不喜欢,构建工具的作用就是让人类舒舒服服写自己喜欢的代码,然后一打包生成计算机喜欢的代码 第一个webpack自身仅仅是将我们引入的模块打包成一个文件(编译import)&am…...
ARM GCC编译器
ARM GCC编译器(GNU Compiler Collection for ARM)是GNU项目的一部分,专门用于编译针对ARM架构的代码。它是一个开源的工具链,支持多种编程语言,包括C、C和汇编语言。以下是关于ARM GCC编译器的详细解释及其作用&#x…...
CSS3 3D 转换介绍
CSS3 中的 3D 转换提供了一种在二维屏幕上呈现三维效果的方式,主要包括translate3d、rotate3d、scale3d等转换函数,下面来详细介绍: 1. 3D 转换的基本概念 坐标系 在 CSS3 的 3D 空间中,使用的是右手坐标系。X 轴是水平方向&…...
关于 Cursor 的一些学习记录
文章目录 1. 写在最前面2. Prompt Design2.1 Priompt v0.1:提示设计库的首次尝试2.2 注意事项 3. 了解 Cursor 的 AI 功能3.1 问题3.2 答案 4. cursor 免费功能体验5. 写在最后面6. 参考资料 1. 写在最前面 本文整理了一些学习 Cursor 过程中读到的或者发现的感兴趣…...
3. 后端验证前端Token
书接上回,后端将token返回给前端,前端存入cookie,每次前端给后端发送请求,后端是如何验证的。 若依是用过滤器来实现对请求的验证,过滤器的简单理解是每次发送请求的时候先发送给过滤器执行逻辑判断以及处理࿰…...
【LLM】Openai-o1及o1类复现方法
note 可以从更为本质的方案出发,通过分析强化学习的方法,看看如何实现o1,但其中的核心就是在于,如何有效地初始化策略、设计奖励函数、实现高效的搜索算法以及利用强化学习进行学习和优化。 文章目录 note一、Imitate, Explore, …...
与“神”对话:Swift 语言在 2025 中的云霓之望
0. 引子 夜深人静,是一片极度沉醉的黑,这便于我与深沉的 macbook 悄悄隐秘于其中。一股异香袭来,恍惚着,撸码中身心极度疲惫、头脑昏沉的我仿佛感觉到了一束淡淡的微光轻洒在窗边。 我的对面若隐若现逐渐浮现出一个熟悉的身影。他…...
设计模式-单例模式
定义 保证一个类仅有一个实例,并提供一个访问它的全局访问点。 类图 类型 饿汉式 线程安全,调用效率高,但是不能延迟加载。 public class HungrySingleton {private static final HungrySingleton instancenew HungrySingleton();private …...
C#枚举类型携带额外数据的方法
Java里面的枚举类型可以定义很多属性,携带各种数据,然而C#里面的枚举类型只能代表数字,不能在枚举类型里面定义各种属性,导致某些应用场景使用起来不方便,但是可以利用C#里面的Attribute来解决这个问题。 例如…...
跨境电商使用云手机用来做什么呢?
随着跨境电商的发展,越来越多的卖家开始尝试使用云手机来协助他们的业务,这是因为云手机具有许多优势。那么,具体来说,跨境电商使用云手机可以做哪些事情呢? (一)实现多账号登录和管理 跨境电商…...
RabbitMQ-消息可靠性以及延迟消息
目录 消息丢失 一、发送者的可靠性 1.1 生产者重试机制 1.2 生产者确认机制 1.3 实现生产者确认 (1)开启生产者确认 (2)定义ReturnCallback (3)定义ConfirmCallback 二、MQ的持久化 2.1 数据持久…...
Mybatis plus中的BaseMapper与ServiceImpl
BaseMapper接口方法与ServiceImpl类方法的区别与联系 什么是BaseMapper?什么是ServiceImpl? BaseMapper 是 MyBatis-Plus 提供的一个基础 Mapper 接口,封装了常用的 CRUD 操作方法,如 selectById、insert、updateById、deleteBy…...
第三篇 Avaya IP Office的架构及其服务组成
所谓的架构,其实就是Solution,解决方案。一般就是如下几套: IPO primary IPO secondaryIPO primary IP500v2IPO primary IPO secondary IP500v2IPO primary IPO secondary IP500v2 Expansion Server(IP500v2,扩展)IPO primaryIPO 500v2 简单的解释…...
近红外简单ROI分析matlab(NIRS_SPM)
本次笔记主要想验证上篇近红外分析是否正确,因为叠加平均有不同的计算方法,一种是直接将每个通道的5分钟实时长单独进行叠加平均,另一种是将通道划分为1分钟的片段,将感兴趣的通道数据进行对应叠加平均,得到一个总平均…...
ESP32学习笔记_FreeRTOS(6)——Event and Notification
摘要(From AI): 这篇博客详细介绍了 FreeRTOS 中的事件组和任务通知机制,讲解了事件组如何通过位操作实现任务间的同步与通信,以及任务如何通过通知机制进行阻塞解除和数据传递。博客提供了多个代码示例,展示了如何使用事件组和任务通知在多任…...
多监控m3u8视频流,怎么获取每个监控的封面图(纯前端)
文章目录 1.背景2.问题分析3.解决方案3.1解决思路3.2解决过程3.2.1 封装播放组件3.2.2 隐形的视频div3.2.3 截取封面图 3.3 结束 1.背景 有这样一个需求: 给你一个监控列表,每页展示多个监控(至少12个,m3u8格式)&…...
ExpGCN:深度解析可解释推荐系统中的图卷积网络
一、引言 在当今信息爆炸的时代,推荐系统已成为电子商务和社交网络中不可或缺的工具,旨在为用户筛选出符合其兴趣的信息。传统的协同过滤(CF)技术通过挖掘用户与项目之间的交互记录来生成推荐,但这种方法简化了模型&a…...
ChatGPT Prompt 编写指南
一、第一原则:明确的意图 你需要明确地表达你的意图和要求,尽可能具体、描述性、详细地描述所需的上下文、你期望的结果等。你的要求越明确,越有希望获得你想要的答案。 糟糕的案例 ❌ 写一首关于 OpenAI 的诗。 更好的案…...
【脑机接口数据处理】 如何读取Trode 的.rec文件 原始数据?
文章目录 函数简介文件下载函数语法基本用法带时间跳过的用法带选项参数的用法输出结构使用示例 注意事项 MATLAB中读取Trodes文件的实用函数——readTrodesFileContinuous 在处理神经科学实验数据时,经常会遇到Trodes格式的文件。这些文件包含了丰富的神经信号数据…...
反转字符串中的单词 II:Swift 实现与详解
网罗开发 (小红书、快手、视频号同名) 大家好,我是 展菲,目前在上市企业从事人工智能项目研发管理工作,平时热衷于分享各种编程领域的软硬技能知识以及前沿技术,包括iOS、前端、Harmony OS、Java、Python等…...
蓝桥杯训练—矩形面积交
文章目录 一、题目二、示例三、解析四、代码 一、题目 平面上有两个矩形,它们的边平行于直角坐标系的X轴或Y轴,对于每个矩形,我们给出它的一对相对顶点的坐标,请你编程写出两个矩形的交的面积 输入格式: 输入包含两行…...
如何设置HTTPS站点防御?
设置HTTPS站点防御涉及到多个层面的安全措施,包括但不限于配置Web服务器、应用安全头信息、使用内容安全策略(CSP)、启用HSTS和OCSP Stapling等。下面是一些关键的步骤来增强HTTPS网站的安全性: 1. 使用强加密协议和密钥交换算法…...