【RocketMQ 存储】- broker 端存储单条消息的逻辑
文章目录
- 1. 前言
- 2. DefaultMessageStore#asyncPutMessage 添加单条消息
- 2.1 DefaultMessageStore#checkStoreStatus 检查存储服务的状态
- 2.2 DefaultMessageStore#checkMessage 校验消息长度是否合法
- 2.3 CommitLog#asyncPutMessage 核心存储逻辑
- 2.4 MappedFile#appendMessage
- 2.5 CommitLog#doAppend 追加消息到 CommitLog
- 2.6 DefaultMessageStore#asyncPutMessage 结尾
- 3. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
上一篇文章中我们已经解析了 RocketMQ 的存储结构中的 MappedFIile,这篇文章中,我们来介绍下 broker 端是如何处理 producer 端发送过来的消息的,存储消息的入口在 DefaultMessageStore#asyncPutMessage
和 DefaultMessageStore#asyncPutMessages
,前者是单条消息的添加,后面是批量添加,这篇文章就看下 asyncPutMessage 的逻辑。
2. DefaultMessageStore#asyncPutMessage 添加单条消息
asyncPutMessages 是 RocketMQ 中处理单条消息的逻辑,下面是这个方法的整体逻辑。
/*** 消息批量添加* @param messageExtBatch the message batch* @return*/
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {// 1.检查下存储服务的状态PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {// 这里就是消息存储服务不可用或者操作系统页繁忙,直接返回结果return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}// 2.校验消息看看是否合法PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {// 消息长度不合法return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}// 3.当前时间long beginTime = this.getSystemClock().now();// 4.存储消息的核心逻辑CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);// 这里就是结果处理resultFuture.thenAccept((result) -> {// 当消息存储完成之后,lambda 表达式会被调用// 消息消耗的时间long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);}// 设置下存储消息的消耗时间和最大消耗时间this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {// 这里就是存储失败了,新增存储失败的次数this.storeStatsService.getPutMessageFailedTimes().add(1);}});return resultFuture;
}
2.1 DefaultMessageStore#checkStoreStatus 检查存储服务的状态
消息存储服务的上层就是 DefaultMessageStore,checkStoreStatus 方法是专门用于检测这个服务的状态。
/*** 检查存储服务的状态* @return*/
private PutMessageStatus checkStoreStatus() {...
}
下面一步一步看下里面的核心,其实就是判断存储服务的各种状态。首先就是判断下服务有没有关闭,也就是 this.shutdown
是否等于 true,当 broker 服务 shutdown 的时候就会调用 DefaultMessageStore#shutdown
方法设置这个属性为 true。
// 1.如果存储服务 DefaultMessageStore 是 SHUTDOWN 状态,就是关闭了,这时候服务不可用
if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
然后就是判断当前节点是不是一个 slave 节点,slave 节点的主要职责是备份 master 节点的数据,以提供数据冗余和高可用性,它不直接处理生产或消费请求,除非 master 节点发生故障,所以说写入数据是 master 才能写入。
// 2.slave 节点的主要职责是备份 master 节点的数据,以提供数据冗余和高可用性,它不直接处理生产或消费请求,除非 master 节点发生故障
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();// 日志打印if ((value % 50000) == 0) {log.warn("broke role is slave, so putMessage is forbidden");}// 服务不可用return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
接着再判断是否支持写入,这里会判断下 broker 磁盘是否满了、或者说写入 ConsumeQueue 或创建 IndexFile 的过程出现异常,出现这些问题就直接发挥结果。
// 3.如果不支持写入,就是说有可能 broker 磁盘满了、写入 ConsumeQueue 或者 IndexFile 错误
if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("the message store is not writable. It may be caused by one of the following reasons: " +"the broker's disk is full, write to logic queue error, write to index file error, etc");}// 这种情况也会返回服务不可用return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {this.printTimes.set(0);
}
这个方法里面通过 isWriteable 判断是否可写,就是判断下标志位是不是出现上面说的几种情况。
public boolean isWriteable() {if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {return true;}return false;
}
回到 checkStoreStatus 方法,最后还会判断下操作系统是否繁忙,如果繁忙就直接返回错误码。
// 4.操作系统页缓存是否繁忙
if (this.isOSPageCacheBusy()) {// 如果是繁忙就返回错误码return PutMessageStatus.OS_PAGECACHE_BUSY;
}
这个 isOSPageCacheBusy
方法就是判断下操作系统页缓存是否繁忙,其实意思应该是对操作系统操作是否繁忙,来看下里面的逻辑。
/*** 判断操作系统页是不是繁忙* @return*/
@Override
public boolean isOSPageCacheBusy() {// beginTimeInLock 是当往 CommitLog 里面添加消息的时候会加锁并且设置这个变量为加锁的时间// 添加完消息之后就会解锁,这个 beginTimeInLock 会被重置为 0long begin = this.getCommitLog().getBeginTimeInLock();// 这个就是加锁加了多长时间long diff = this.systemClock.now() - begin;// diff >= 10000s 代表距离上一次写入 CommitLog 已经超过 10000s 了// diff <= 1s 代表持有锁的时间还没到 1s// 这两种情况就是不繁忙,为什么第一种是不繁忙呢,因为数据写入是不太可能会超过 10000s 的,如果这么久都没能写入 CommitLog 就肯定是出问题了// 这时候也不需要考虑是不是系统繁忙了return diff < 10000000&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
beginTimeInLock 是当往 CommitLog 里面添加消息的时候会加锁并且设置这个变量为加锁的时间,添加完消息之后就会解锁,这个 beginTimeInLock 会被重置为 0,这里说的添加消息是添加到 CommitLog 下面的 MappedFile。
再来看下里面的判断,这里面会判断满足 diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()
就认为是页繁忙。osPageCacheBusyTimeOutMills
默认是 1s,也就是说持有锁的时间在 (1s, 10000s) 就认为对操作系统页缓存操作繁忙。
那为什么要有一个小于 10000s 的限制呢? 上面也说了,当添加完消息就会解锁,这时候 beginTimeInLock
会被重置为 0,这时候计算出来的 diff 就是 System.currentTimeMillis,所以这个逻辑其实会判断出这种情况下是没有加锁的,于是就不算繁忙。
所以这里的逻辑就很明确了,这里会认为 broker 对消息的写入是不会超过 10000s 的,同时当超过 1s 就认为持有锁的时间太久了,消息写入时间太长了,有可能是对 MappedByteBuffer 写入的时候发生了缓存页刷新等操作导致出现问题,这时候返回错误码 PutMessageStatus.OS_PAGECACHE_BUSY
。
最后回到 checkStoreStatus 方法,经过上面的判断如果都不满足,就返回 PutMessageStatus.PUT_OK
,意思是可以往 CommitLog 中追加消息,下面是整体逻辑。
/*** 检查存储服务的状态* @return*/
private PutMessageStatus checkStoreStatus() {// 1.如果存储服务 DefaultMessageStore 是 SHUTDOWN 状态,就是关闭了,这时候服务不可用if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return PutMessageStatus.SERVICE_NOT_AVAILABLE;}// 2.slave 节点的主要职责是备份 master 节点的数据,以提供数据冗余和高可用性,它不直接处理生产或消费请求,除非 master 节点发生故障if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();// 日志打印if ((value % 50000) == 0) {log.warn("broke role is slave, so putMessage is forbidden");}// 服务不可用return PutMessageStatus.SERVICE_NOT_AVAILABLE;}// 3.如果不支持写入,就是说有可能 broker 磁盘满了、写入 ConsumeQueue 或者 IndexFile 错误if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("the message store is not writable. It may be caused by one of the following reasons: " +"the broker's disk is full, write to logic queue error, write to index file error, etc");}// 这种情况也会返回服务不可用return PutMessageStatus.SERVICE_NOT_AVAILABLE;} else {this.printTimes.set(0);}// 4.操作系统页缓存是否繁忙if (this.isOSPageCacheBusy()) {// 如果是繁忙就返回错误码return PutMessageStatus.OS_PAGECACHE_BUSY;}// 5.这里就是可以添加消息return PutMessageStatus.PUT_OK;
}
2.2 DefaultMessageStore#checkMessage 校验消息长度是否合法
这里面逻辑就比较简单了,我就不细说,里面主要包括两条规则。
- 消息 topic 长度不能超过 127
- 消息属性长度不能超过 32767
/*** 校验消息是否合法* @param msg* @return*/
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {// topic 长度不能超过 127if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return PutMessageStatus.MESSAGE_ILLEGAL;}// 消息属性长度不能超过 32767if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return PutMessageStatus.MESSAGE_ILLEGAL;}// 这里就是检查通过,没问题return PutMessageStatus.PUT_OK;
}
2.3 CommitLog#asyncPutMessage 核心存储逻辑
这里是添加消息的核心逻辑,上面我略过了检查 LMQ 消息是否合法这段逻辑,LMQ 队列是一种轻量级的队列,这里就先不展开细说了,来看下 CommitLog#asyncPutMessage 的核心逻辑。
这个方法里面的逻辑太多了,我这里就慢慢拆解。但是大家可以先看下下面的图,最终消息就是按下面的顺序存到 ByteBuffer 中的。
代码的逻辑首先设置下消息的存储时间和消息 CRC 校验码。
// 1.设置消息的存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting on the client)
// 2.设置消息体 CRC 校验码
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
接着来处理不同事务状态的消息,我们知道 RocketMQ 的事务消息分为 4 个状态,关于事务消息后续会出相关文章来介绍源码。
- MessageSysFlag.TRANSACTION_NOT_TYPE: 普通消息,不是事务消息
- MessageSysFlag.TRANSACTION_PREPARED_TYPE: 事务消息是一阶段 PREPARE 状态
- MessageSysFlag.TRANSACTION_COMMIT_TYPE: 事务消息是 Commit 状态,意味者可以投到真实队列
- MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 消息一阶段的 Rollback
上面简单介绍了事务类型,这几种类型中,对于 TRANSACTION_NOT_TYPE
和 TRANSACTION_COMMIT_TYPE
这两种类型会去处理延时队列的请求。延时消息也是 RocketMQ 的一种消息,RocketMQ 提供了 18 中延时级别,当一条延时消息被创建出来时首先会被加入名为 SCHEDULE_TOPIC_XXXX
的 topic 下面的队列,这个 SCHEDULE_TOPIC_XXXX
topic 提供了 18 个消息队列,每一个队列对应一个延时等级,所谓的延时等级就是延时时间,这里就不多说了,下面来看下这两种事务状态下处理延时等级的逻辑。
// 3.这里就是获取消息的事务状态,处理延时消息的逻辑
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 首先能设置到 CommitLog 中的要么就是非事务消息,要么就是事务一阶段 Commit 的消息,
// 也就是消息本地执行逻辑没有问题,后续就可以把事务消息从事务投放到普通队列
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// 下面是处理延迟消息的逻辑,延迟消息有一个延时等级,每个延时等级都对应一个延时时间if (msg.getDelayTimeLevel() > 0) {// 延时等级不能超过最大值if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 延时队列 topictopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 根据延时等级获取这个 topic 下面对应的队列,延时 topic 下每一个延时等级都对应第一个队列,延时消息首先就会存储在这些队列中// 等到执行时间了就会将这些消息提交到真实 topic 和下面的队列int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 从消息的属性中根据 REAL_TOPIC 和 REAL_QID 来获取真实的 topic 和队列 idMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));// 设置消息属性msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置真实 topic 和队列 id 到消息中,后续这个到期的消息就会被发送到真实队列里面供消费者消费了msg.setTopic(topic);msg.setQueueId(queueId);}
}
这里的逻辑其实就是设置真实的 topic
和 queueId
,对于延时消息,真实的 topic 会存储在 properties 中,key 是 REAL_TOPIC
;而真实的 queueId 也会被存在 properties 中,key 是 PROPERTY_REAL_QUEUE_ID
。
下面继续设置消息标记,也就是消息的 broker 存储端和 producer 产生端是 IPV
4 还是 IPV6,为什么要设置这个标记呢,因为 IPV4 和 IPV6 的字节数不同,计算出来的消息长度也不同,所以这里设置一个标记是为了后面动态计算消息长度的。
// 消息产生的地址,也就是 Producer 发送端的地址
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {// 如果是 IPV6,就设置下消息的发送端标记msg.setBornHostV6Flag();
}// 消息存储的地址,也就是 Broker 存储端的地址
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {// 如果是 IPV6,就设置下消息的存储端标记msg.setStoreHostAddressV6Flag();
}
好了,接着往下看,下面是从 PutMessageThreadLocal 本地变量中拿到 encoder 对消息进行编码,其实说是编码,不如说是把消息 MessageExtBrokerInner 的内容转移到 ByteBuffer 中,这个 ByteBuffer 是一个 HeapByteBuffer,是一个临时变量,临时存储消息的。
// 4.从线程本地变量获取 PutMessageThreadLocal
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 5.使用本地变量里面的 MessageExtEncoder 对消息编码并存储到 encoderBuffer 中,并且调用 flip 切换读模式,后面就可以开始读取了
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {// 如果不为空就是上面编码的时候对于一些属性的校验失败了,直接返回return CompletableFuture.completedFuture(encodeResult);
}
// 6.设置消息编码结果
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
来看下面里面的 encode 逻辑,这个方法会对消息进行编码,这里面的逻辑我就不多介绍了,看下面这张图,这里面的方法就是按照这个图的字段顺序进行编码。
/*** 对消息进行编码* @param msgInner 需要编码的消息对象* @return*/
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {/*** 序列化消息的属性*/// 将消息的属性字符串转成数组final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);// 计算属性数据的长度final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;// 如果属性长度超过 Short 的最大值if (propertiesLength > Short.MAX_VALUE) {// 返回结果log.warn("putMessage message properties length too long. length={}", propertiesData.length);return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}// topicfinal byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);// topic 长度final int topicLength = topicData.length;// 计算消息体长度final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;// 计算消息长度,这里传入 bodyLength、topicLength、propertiesLength 是因为这几个都是不定长的,消息里面会存储这部分数据,所以// 需要计算这部分数据的长度。而 sysFlag 可以用来判断里面的生产者地址或者 broker 地址是不是 IPV6 的。final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);// 如果消息总长度超过最大允许的消息大小,返回消息非法的结果if (msgLen > this.maxMessageSize) {CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength+ ", maxMessageSize: " + this.maxMessageSize);return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}// 初始化存储空间的 ByteBuffer,大小为消息的总长度this.resetByteBuffer(encoderBuffer, msgLen);// 1.写入消息的总长度(TOTALSIZE)this.encoderBuffer.putInt(msgLen);// 2.写入魔法数(MAGICCODE),用于标识消息的版本或类型this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);// 3.写入消息体的 CRC(BODYCRC),用于校验消息体的完整性this.encoderBuffer.putInt(msgInner.getBodyCRC());// 4.写入队列 ID(QUEUEID),标识消息所属的队列this.encoderBuffer.putInt(msgInner.getQueueId());// 5.写入消息的标志(FLAG),用于标识消息的一些特性this.encoderBuffer.putInt(msgInner.getFlag());// 6.写入队列偏移量(QUEUEOFFSET),初始值为 0,后续需要更新this.encoderBuffer.putLong(0);// 7.写入物理偏移量(PHYSICALOFFSET),初始值为 0,后续需要更新this.encoderBuffer.putLong(0);// 8.写入系统标志(SYSFLAG),标识消息的系统特性this.encoderBuffer.putInt(msgInner.getSysFlag());// 9.写入消息的生成时间戳(BORNTIMESTAMP),表示 Producer 消息生成的时间this.encoderBuffer.putLong(msgInner.getBornTimestamp());// 10.Producer 端的 IP + 端口socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);// 11.写入消息的存储时间戳(STORETIMESTAMP),表示消息在 Broker 存储的时间this.encoderBuffer.putLong(msgInner.getStoreTimestamp());// 12.Broker 端的 IP + 端口socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);// 13.消息消费重试次数this.encoderBuffer.putInt(msgInner.getReconsumeTimes());// 14.写入预处理事务的偏移量(Prepared Transaction Offset),用于事务消息this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());// 15.写入消息体的长度(BODY),并写入消息体数据this.encoderBuffer.putInt(bodyLength);if (bodyLength > 0)this.encoderBuffer.put(msgInner.getBody());// 16.topic 的长度和具体数据this.encoderBuffer.put((byte) topicLength);this.encoderBuffer.put(topicData);// 17.写入属性的长度和消息的属性值this.encoderBuffer.putShort((short) propertiesLength);if (propertiesLength > 0)this.encoderBuffer.put(propertiesData);// 18.切换读模式,切换后就可以开始读取这条消息了encoderBuffer.flip();return null;
}
这个 encode 方法中还有一个 calMsgLength
,这个方法就是通过 sysFlag
、bodyLength
、topicLength
、propertiesLength
来计算消息长度,之所以传入这几个参数,就是因为 CommitLog 消息中这些变量是不定长的,比如消息体。
/*** 计算消息长度* @param sysFlag 是否是 IPV6* @param bodyLength 消息体长度* @param topicLength topic 长度* @param propertiesLength 消息属性长度* @return*/
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {// 如果是 IPV6 就是 20,否则是 8,这是因为 bornhost 是包括 IP + port 的,所以会多上 4 字节int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;// 这里也是同理int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;// 计算一条 CommitLog 消息的长度final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}
继续回到 asyncPutMessage 方法,上面对消息进行了编码,下面创建一个消息存储的上下文,这个上下文里面存储了编码后的消息,同时存储了一个 key,这个 key 是 “topic-queueId”,专门用于 CommitLog#topicQueueTable 。
// CommitLog 的属性
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);// CommitLog#asyncPutMessage
// 7.下面是存储消息上下文,这里的 key 是 CommitLog#topicQueueTable 集合中的 key,用来存储 [队列 ID -> 偏移量] 的映射关系
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));/*** 根据 topic 和 queueId 生成 key* @param keyBuilder* @param messageExt* @return*/
private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {// topic-queueIdkeyBuilder.setLength(0);keyBuilder.append(messageExt.getTopic());keyBuilder.append('-');keyBuilder.append(messageExt.getQueueId());// 返回结果return keyBuilder.toString();
}
下面继续,这里会加锁,因为 broker 中所有消息都会发送到一个 CommitLog 中,这里就需要加锁了,加锁有两种实现,自旋或者 ReentrantLock 互斥锁,看 StoreConfig 具体实现。
putMessageLock.lock();
try {...
}
加锁之后就是添加消息前的处理鲁珀及,涉及到设置存储消息、获取要存储的 MappedFile 等。
// 获取 CommitLog 下面的文件列表中的最后一个 MappedFile 文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 加锁时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// 设置加锁时间,这个加锁时间可以用来计算持有锁的时间从而来判断刷盘时间久不久,操作系统刷盘是不是繁忙
this.beginTimeInLock = beginLockTimestamp;// 设置消息的存储时间为加锁的时间,确保全局有序
msg.setStoreTimestamp(beginLockTimestamp);// 这里就是说如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFile
if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
// 如果还是获取不到,那么说明创建失败了
if (null == mappedFile) {// 返回结果 CREATE_MAPEDFILE_FAILEDlog.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
上面的方法中的 getLastMappedFile
方法就是用于获取 CommitLog 下面的文件列表中的最后一个 MappedFile 文件,就是最新的 MappedFile 文件,而 beginTimeInLock
还记得这个属性吗,在 2.1 小结 checkStoreStatus
判断页缓存繁忙就是用这个属性来判断的,这个属性的初始化就是在这里,下面来看下如何获取 MappedFile 的。
/**
* 获取文件列表末尾的 MappedFile,就是最后一个 MappedFile,也是正在写入的 MappedFile
* @return
*/
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {// 获取最后一个 MapperFilemappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}
}
这个方法逻辑很简单,就是获取 mappedFileLast 的最后一个 MappedFile,当然还有一个 this.mappedFileQueue.getLastMappedFile(0)
方法,这个方法和上面的 getLastMappedFile()
不一样,这个方法如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFile,当然这里的逻辑我先不说,在后面会写文章介绍,因为里面涉及到创建 MappedFile 的服务。
继续回到 CommitLog#asyncPutMessage 方法,下面就是核心逻辑追加消息到 CommitLog 中。
// 核心逻辑,追加消息到 CommitLog 中
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
追加完了之后,会处理追加消息的结果,注意如果是 END_OF_FILE
就是说上一个文件已经满了,这时候会创建一个新的文件,并且赋值 unlockMappedFile = mappedFile
,也就是说会将 unlockMappedFile
设置为已经写满了的那个文件,记住这个变量,下面有大用。
// 下面判断下追加消息的结果
switch (result.getStatus()) {case PUT_OK:// 添加成功,直接退出break;case END_OF_FILE:// 文件剩余空间不足,那么初始化新的文件并尝试再次存储unlockMappedFile = mappedFile;// 创建一个新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// 创建失败,直接返回错误结果 CREATE_MAPEDFILE_FAILEDlog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 创建成功后再次添加消息result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);// 退出break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:// 消息长度错误return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:// 位置错误return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:// 其他错误return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}// 这里就是加锁的时间,也是添加消息所耗费的时间
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
最后在 finally 中进行解锁并重置加锁时间。
finally {// 最后解锁并且重置 beginTimeInLockbeginTimeInLock = 0;putMessageLock.unlock();
}
上面就已经将消息追加到 CommitLog 中的 ByteBuffer 了,这里的 ByteBuffer 有可能是 MappedByteBuffer,也有可能是读写分离创建出来的 writeBuffer,最后来看下添加之后的后续逻辑。
// 如果加锁时间超过了 500ms
if (elapsedTimeInLock > 500) {// 记录日志log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
上面就是判断如果加锁时间超过了 500ms,就记录下日志。
// 如果文件写满了并且启用了文件预热
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {// 对 unlockMappedFile 解锁,这里解锁是使用 munlock 解除这片内存背后锁定的 page cache,这下就能够交换到 swap 空间了// 因为这边 unlockMappedFile 已经写满了,所以这片空间可以解锁方便 swap 交换到磁盘 swap 空间,读写的重点在新的 MappedFile// 文件上,对于旧的 unlockMappedFile 读写就没有那么多了this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
然后上面判断下如果文件写满了并且开启了文件预热,因为开启文件预热,那么在创建 MappedFile 的时候会调用 warmMappedFIle 方法每隔 4K 就写入一个 0 触发中断建立虚拟内存到物理内存的页表项,同时调用 mappedFile.mlock 锁定这片内存,防止交换到 swap 磁盘中。
那么在这里就需要将 unlockMappedFile
解锁,unlockMappedFile
还记得吗,上面将写满的文件赋值给 unlockMappedFile,因为文件已经写满了,所以会对这个文件锁定的空间进行解锁,至于为什么解锁,是因为一般来说都是最后一个文件操作最频繁,这种情况下就需要确保最后一个文件不被交换到 swap 空间防止 mmap 映射出来的虚拟内存和物理内存页表项被删掉。但是总得有不需要用的物理页交换到这里 swap 空间以腾出空间给热点访问页,所以对于写满的 MappedFile,既然后续也不会频繁访问,干脆就解锁内存,让这些物理页能交换到 swap 空间了。
好了,继续看下面的逻辑,是一些数据统计。
// 返回结果
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// 下面是一些数据统计,如写入 topic 的消息数 + 1
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
// 存储的消息总字节 + result.getWroteBytes()
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
接下来就是提交刷盘请求,因为上面已经把数据写入 CommitLog 的 ByteBuffer 了,所以下面会提交刷盘请求和从节点赋值请求,这些请求的处理都是由其他服务异步处理的。
// 9.上面添加消息到 CommitLog 只是添加到背后的 ByteBuffer,接下来需要提交刷盘请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 10.RocketMQ 主节点写入数据之后,向从节点提交消息复制请求,让 slave 节点去同步 master 节点新写入的消息
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
最后返回结果。
// 处理刷盘请求
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {// 外层会阻塞等待刷盘请求和从节点复制请求的结果if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;
});
好了,上面就是 CommitLog#asyncPutMessage 的核心逻辑,下面我贴下整体代码。
/*** broker 异步存储消息* @param msg* @return*/
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// 1.设置消息的存储时间msg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting on the client)// 2.设置消息体 CRC 校验码msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();// 消息 topicString topic = msg.getTopic();// int queueId = msg.getQueueId();// 3.这里就是获取消息的事务状态,处理延时消息的逻辑final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());// 首先能设置到 CommitLog 中的要么就是非事务消息,要么就是事务一阶段 Commit 的消息,// 也就是消息本地执行逻辑没有问题,后续就可以把事务消息从事务投放到普通队列if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// 下面是处理延迟消息的逻辑,延迟消息有一个延时等级,每个延时等级都对应一个延时时间if (msg.getDelayTimeLevel() > 0) {// 延时等级不能超过最大值if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 延时队列 topictopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 根据延时等级获取这个 topic 下面对应的队列,延时 topic 下每一个延时等级都对应第一个队列,延时消息首先就会存储在这些队列中// 等到执行时间了就会将这些消息提交到真实 topic 和下面的队列int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 从消息的属性中根据 REAL_TOPIC 和 REAL_QID 来获取真实的 topic 和队列 idMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));// 设置消息属性msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置真实 topic 和队列 id 到消息中,后续这个到期的消息就会被发送到真实队列里面供消费者消费了msg.setTopic(topic);msg.setQueueId(queueId);}}// 消息产生的地址,也就是 Producer 发送端的地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {// 如果是 IPV6,就设置下消息的发送端标记msg.setBornHostV6Flag();}// 消息存储的地址,也就是 Broker 存储端的地址InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {// 如果是 IPV6,就设置下消息的存储端标记msg.setStoreHostAddressV6Flag();}// 4.从线程本地变量获取 PutMessageThreadLocalPutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();// 5.使用本地变量里面的 MessageExtEncoder 对消息编码并存储到 encoderBuffer 中,并且调用 flip 切换读模式,后面就可以开始读取了PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {// 如果不为空就是上面编码的时候对于一些属性的校验失败了,直接返回return CompletableFuture.completedFuture(encodeResult);}// 6.设置消息编码结果msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);// 7.下面是存储消息上下文,这里的 key 是 CommitLog#topicQueueTable 集合中的 key,用来存储 [队列 ID -> 偏移量] 的映射关系PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 8.加锁,因为 broker 中所有消息都会发送到一个 CommitLog 中,这里就需要加锁了// 这里的加锁有两种实现,自旋或者 ReentrantLock 互斥锁,看 StoreConfig 具体实现putMessageLock.lock();try {// 获取 CommitLog 下面的文件列表中的最后一个 MappedFile 文件MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 加锁时间long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();// 设置加锁时间,这个加锁时间可以用来计算持有锁的时间从而来判断刷盘时间久不久,操作系统刷盘是不是繁忙this.beginTimeInLock = beginLockTimestamp;// 设置消息的存储时间为加锁的时间,确保全局有序msg.setStoreTimestamp(beginLockTimestamp);// 这里就是说如果获取不到 MappedFile 或者获取到的 MappedFile 已经写满了,这时候会获取或者创建下一个 MappedFileif (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}// 如果还是获取不到,那么说明创建失败了if (null == mappedFile) {// 返回结果 CREATE_MAPEDFILE_FAILEDlog.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 核心逻辑,追加消息到 CommitLog 中result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);// 下面判断下追加消息的结果switch (result.getStatus()) {case PUT_OK:// 添加成功,直接退出break;case END_OF_FILE:// 文件剩余空间不足,那么初始化新的文件并尝试再次存储unlockMappedFile = mappedFile;// 创建一个新的文件mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// 创建失败,直接返回错误结果 CREATE_MAPEDFILE_FAILEDlog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}// 创建成功后再次添加消息result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);// 退出break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:// 消息长度错误return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:// 位置错误return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:// 其他错误return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}// 这里就是加锁的时间,也是添加消息所耗费的时间elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;} finally {// 最后解锁并且重置 beginTimeInLockbeginTimeInLock = 0;putMessageLock.unlock();}// 如果加锁时间超过了 500msif (elapsedTimeInLock > 500) {// 记录日志log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}// 如果文件写满了并且启用了文件预热if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {// 对 unlockMappedFile 解锁,这里解锁是使用 munlock 解除这片内存背后锁定的 page cache,这下就能够交换到 swap 空间了// 因为这边 unlockMappedFile 已经写满了,所以这片空间可以解锁方便 swap 交换到磁盘 swap 空间,读写的重点在新的 MappedFile// 文件上,对于旧的 unlockMappedFile 读写就没有那么多了this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}// 返回结果PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// 下面是一些数据统计,如写入 topic 的消息数 + 1storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);// 存储的消息总字节 + result.getWroteBytes()storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 9.上面添加消息到 CommitLog 只是添加到背后的 ByteBuffer,接下来需要提交刷盘请求CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 10.RocketMQ 主节点写入数据之后,向从节点提交消息复制请求,让 slave 节点去同步 master 节点新写入的消息CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);// 处理刷盘请求return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {// 外层会阻塞等待刷盘请求和从节点复制请求的结果if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});
}
2.4 MappedFile#appendMessage
这个方法就是上面 CommitLog#asyncPutMessage 调用的核心逻辑,就是在这里面追加消息的。
/*** 追加消息到 MappedFile 的 ByteBuffer* @param msg 消息* @param cb 回调函数,实现是 DefaultAppendMessageCallback* @param putMessageContext 消息存放的上下文* @return*/
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,PutMessageContext putMessageContext) {return appendMessagesInner(msg, cb, putMessageContext);
}
再来看下 appendMessagesInner
方法的实现。
/*** 把消息追加到 CommitLog 文件结尾,这个方法是 CommitLog 调用的* @param messageExt* @param cb* @param putMessageContext* @return*/
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;// 获取写指针的位置int currentPos = this.wrotePosition.get();// 写指针小于文件大小,可以写入if (currentPos < this.fileSize) {// 是否开启堆外缓存,如果开启了就使用 writeBuffer 进行写入(读写分离)ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();// 标记写入位置byteBuffer.position(currentPos);// 消息写入结果AppendMessageResult result;if (messageExt instanceof MessageExtBrokerInner) {// 单条消息写入result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {// 批量消息写入result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {// 不知道是什么return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// 更新写入位置this.wrotePosition.addAndGet(result.getWroteBytes());// 设置消息存入的时间(最新)this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
这个方法就是把消息追加到 CommitLog 文件结尾,这个方法是 CommitLog 调用的。wrotePosition
是写入的 MappedFile 的指针,一个 MappedFile 一个 wrotePosition 变量。同样的在写入之前会调用 slice 方法获取文件视图。
通过这个视图的获取方法就能看出,如果开启堆外缓存,如果开启了就使用 writeBuffer 进行写入(读写分离),这时候写入的数据是直接写入的 writeBuffer,但是由于不是 mmap 映射出来的 MappedByteBuffer,所以写入 writeBuffer 还得 commit 提交到 Page Cache 才能 flush 刷盘。
而如果没有开启读写分离,消息写入 MappedByteBuffer 就相当于写入 Page Cache,这时候就不需要 commit,可以直接 flush 刷盘。
2.5 CommitLog#doAppend 追加消息到 CommitLog
/**
* 追加消息
* @param fileFromOffset 文件起始索引
* @param byteBuffer CommitLog 的 ByteBuffer(没有开启读写分离就是 mappedByteBuffer,开启了就是 writeBuffer)
* @param maxBlank 文件还有多少空位可以写入数据
* @param msgInner 要写入的消息
* @param putMessageContext 写入消息上下文
* @return
*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {...
}
来看下里面的逻辑,首先获取写指针的位置,就是从当前 CommitLog 的哪个下标开始写入数据。
// 写指针的位置,物理偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
然后构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识,构建出来的 ID 格式是:ip + port + wroteOffset
。
// 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识
Supplier<String> msgIdSupplier = () -> {// 消息类型int sysflag = msgInner.getSysFlag();// 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16// 这个消息 ID 组成是: ip + port + wroteOffsetint msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;// 分配存储消息 ID 的 ByteBuffer,这里是一个 HeapByteBufferByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);// 首先把 ip + port 设置进去,然后切换读模式MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);// 这里 clear 把 ByteBuffer 里面的属性重置了msgIdBuffer.clear();// 设置 wroteOffsetmsgIdBuffer.putLong(msgIdLen - 8, wroteOffset);// 转成 16 进制字符串返回return UtilAll.bytes2string(msgIdBuffer.array());
};
下面接着通过 “topic-queueId” 获取 topicQueueTable 中的 ComsumeQueue 队列偏移量,这里的偏移量其实是指第几条索引,真正偏移量需要通过 queueOffset * 20
求出来,20 是 ConsumeQueue 中一条索引长度。
// Record ConsumeQueue information
// 记录 ConsumeQueue 的信息
// key = topic-queueId
String key = putMessageContext.getTopicQueueTableKey();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
// queueOffset 是 ConsumeQueue 中的最大偏移量,当 broker 写入一条消息,就会设置对应的 queueOffset++,
// 所以实际的 ConsumeQueue 偏移量应该等于 queueOffset * 20
if (null == queueOffset) {// 没有记录这个 key 就初始化为 0queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);
}
下面是 LMQ 的处理,这里先不细说了,这里直接看源码。
// LMQ 的处理,先不看
boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
if (!multiDispatchWrapResult) {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
上面我们不是获取了 queueOffset
吗,对于事务 TRANSACTION_PREPARED_TYPE
和 TRANSACTION_ROLLBACK_TYPE
这两个类型的消息是不会添加 ConsumeQueue 索引的,所以这里的 queueOffset 会被设置为 0;
// 获取到之前编码过的 ByteBuffer
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
// 获取消息总长度
final int msgLen = preEncodeBuffer.getInt(0);// 需要特殊处理的事务消息
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {// Prepare 阶段和 rollback 阶段的消息是不会设置到消费队列的case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset = 0L;break;// 如果是普通消息和 commit 阶段的事务消息才会设置到消费队列case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;
}
接着下面就是添加消息的逻辑了,但是在处理添加逻辑之前,需要看下 CommitLog 里面还有没有足够多的空间。
// 确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {// 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE// 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息this.msgStoreItemMemory.clear();// 1.存储消息总长度this.msgStoreItemMemory.putInt(maxBlank);// 2.魔术设置为 BLANK_MAGIC_CODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3.当前时间final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// 把这部分数据设置到 byteBuffer 中byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);// 返回结果,外层会对 END_OF_FILE 做特殊处理return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
这里面其实就是会预留 8 个字节的空间出来,如果不满足,那么就会把 存储消息总长度 + BLANK_MAGIC_CODE
存储到 ByteBuffer 的尾部,标识当前文件已满。
preEncodeBuffer
是之前 encode 方法编码过的消息,不过上面编码消息的时候 queueOffset(消息在 CommitLog 中的起始偏移量)、physical offset(消息在 CommitLog 中的起始偏移量)、storeTimestamp(消息存储到 CommitLog 的时间)这些是没有初始化的,因为要等到这个 doAppend 方法才能初始化。
// 下面要开始从第 6 个位置开始设置信息,所以 pos 从 20 开始
int pos = 4 + 4 + 4 + 4 + 4;
// 6.ConsumeQueue 中的偏移量,实际偏移量等于 queueOffset * 20
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7.这条消息在 CommitLog 中的起始偏移量
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8.SYSFLAG, 9.BORNTIMESTAMP, 10.BORNHOST, 11.STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen;
// 重新设置消息的存储时间,这里初始化的时候设置成 0,所以这里会设置
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
设置完之后就需要添加到 CommitLog 的 ByteBuffer 中。
// 起始时间
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 把消息写入 MappedFile 的 ByteBuffer,有可能是 mappedByteBuffer 或者 writeBuffer
byteBuffer.put(preEncodeBuffer);
// 设置完了就把编码消息清空
msgInner.setEncodedBuff(null);
// 设置返回结果
// (1) 消息设置的结果 AppendMessageStatus.PUT_OK
// (2) 写指针位置 wroteOffset
// (3) 消息长度 msgLen
// (4) msgIdSupplier 消息 ID 生成器
// (5) storeTimestamp 是消息存储时间
// (6) queueOffset 是消息在 ConsumeQueue 中存储的索引
// (7) pagecacheRT 是消息写入的耗费时间
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
在添加到 CommitLog 的 ByteBuffer 后(writeBuffer 或者 mappedByteBuffer),需要更新下 topicQueueTable,就是 ConsumeQueue 的队列最大物理偏移量。
// 看一下消息的事务类型
switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:// prepare 阶段或者 rollback 阶段类型的消息就直接退出break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// 如果是普通消息或者是 commit 阶段的事务消息,那么更新 topicQueueTable 索引CommitLog.this.topicQueueTable.put(key, ++queueOffset);// 下面是 LMQ 相关队列的更新逻辑CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);break;default:break;
}
return result;
好了,到这里追加消息的逻辑就解析完了,下面是这个方法的整体流程。
/*** 追加消息* @param fileFromOffset 文件起始索引* @param byteBuffer CommitLog 的 ByteBuffer(没有开启读写分离就是 mappedByteBuffer,开启了就是 writeBuffer)* @param maxBlank 文件还有多少空位可以写入数据* @param msgInner 要写入的消息* @param putMessageContext 写入消息上下文* @return*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>// 写指针的位置,物理偏移量long wroteOffset = fileFromOffset + byteBuffer.position();// 构建消息 ID,在消息发送的时候会在 broker 端生成一个唯一 ID 进行标识Supplier<String> msgIdSupplier = () -> {// 消息类型int sysflag = msgInner.getSysFlag();// 如果是 IPV4,那么长度就是 16,IPV6 长度就是 28,默认就是 16// 这个消息 ID 组成是: ip + port + wroteOffsetint msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;// 分配存储消息 ID 的 ByteBuffer,这里是一个 HeapByteBufferByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);// 首先把 ip + port 设置进去,然后切换读模式MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);// 这里 clear 把 ByteBuffer 里面的属性重置了msgIdBuffer.clear();// 设置 wroteOffsetmsgIdBuffer.putLong(msgIdLen - 8, wroteOffset);// 转成 16 进制字符串返回return UtilAll.bytes2string(msgIdBuffer.array());};// Record ConsumeQueue information// 记录 ConsumeQueue 的信息// key = topic-queueIdString key = putMessageContext.getTopicQueueTableKey();Long queueOffset = CommitLog.this.topicQueueTable.get(key);// queueOffset 是 ConsumeQueue 中的最大偏移量,当 broker 写入一条消息,就会设置对应的 queueOffset++,// 所以实际的 ConsumeQueue 偏移量应该等于 queueOffset * 20if (null == queueOffset) {// 没有记录这个 key 就初始化为 0queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}// LMQ 的处理,先不看boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);if (!multiDispatchWrapResult) {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// 需要特殊处理的事务消息final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepare 阶段和 rollback 阶段的消息是不会设置到消费队列的case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset = 0L;break;// 如果是普通消息和 commit 阶段的事务消息才会设置到消费队列case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}// 获取到之前编码过的 ByteBufferByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();// 获取消息总长度final int msgLen = preEncodeBuffer.getInt(0);// 确认空间是否可用,注意一个 CommitLog 文件需要保留下 8 个空白空间if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {// 这里就是空间不够用了,需要返回一个错误的魔数,魔数是 BLANK_MAGIC_CODE// 首先重置 msgStoreItemMemory 的属性,这个是一个临时变量,存储要处理的消息this.msgStoreItemMemory.clear();// 1.存储消息总长度this.msgStoreItemMemory.putInt(maxBlank);// 2.魔术设置为 BLANK_MAGIC_CODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3.当前时间final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// 把这部分数据设置到 byteBuffer 中byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);// 返回结果,外层会对 END_OF_FILE 做特殊处理return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}// 下面要开始从第 6 个位置开始设置信息,所以 pos 从 20 开始int pos = 4 + 4 + 4 + 4 + 4;// 6.ConsumeQueue 中的偏移量,实际偏移量等于 queueOffset * 20preEncodeBuffer.putLong(pos, queueOffset);pos += 8;// 7.这条消息在 CommitLog 中的起始偏移量preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;// 8.SYSFLAG, 9.BORNTIMESTAMP, 10.BORNHOST, 11.STORETIMESTAMPpos += 8 + 4 + 8 + ipLen;// 重新设置消息的存储时间,这里初始化的时候设置成 0,所以这里会设置preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());// 起始时间final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// 把消息写入 MappedFile 的 ByteBuffer,有可能是 mappedByteBuffer 或者 writeBufferbyteBuffer.put(preEncodeBuffer);// 设置完了就把编码消息清空msgInner.setEncodedBuff(null);// 设置返回结果// (1) 消息设置的结果 AppendMessageStatus.PUT_OK// (2) 写指针位置 wroteOffset// (3) 消息长度 msgLen// (4) msgIdSupplier 消息 ID 生成器// (5) storeTimestamp 是消息存储时间// (6) queueOffset 是消息在 ConsumeQueue 中存储的索引// (7) pagecacheRT 是消息写入的耗费时间AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);// 看一下消息的事务类型switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:// prepare 阶段或者 rollback 阶段类型的消息就直接退出break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// 如果是普通消息或者是 commit 阶段的事务消息,那么更新 topicQueueTable 索引CommitLog.this.topicQueueTable.put(key, ++queueOffset);// 下面是 LMQ 相关队列的更新逻辑CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);break;default:break;}return result;
}
2.6 DefaultMessageStore#asyncPutMessage 结尾
上面 2.1 - 2.5 小结就是追加消息的逻辑,追加完消息之后,在入口 asyncPutMessage 方法的结尾会设置结果处理逻辑。
// 6.这里返回 putResultFuture,在上层会去通过 get 非法阻塞等待
putResultFuture.thenAccept((result) -> {// 当消息存储完成之后,lambda 表达式会被调用// 消息消耗的时间long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}// 设置下存储消息的消耗时间和最大消耗时间this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);// 这里就是判断下存储结果是不是成功的if (null == result || !result.isOk()) {// 这里就是存储失败了,新增存储失败的次数this.storeStatsService.getPutMessageFailedTimes().add(1);}
});return putResultFuture;
到这里存储单条消息的核心逻辑就解析完了,可以看到这里只是返回一个 CompletableFuture,这里并不会阻塞,但是上层方法调用这个方法之后通过 get 方法就可以阻塞等待了,比如还是 DefaultMessageStore 的 putMessage。
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {return waitForPutResult(asyncPutMessage(msg));
}
3. 小结
这篇文章中我们解析了 broker 端存储单条消息的逻辑,但是除了单条消息,broker 还支持消息的批量新增,所以下一篇文章我们会解析消息的批量新增逻辑,下一篇文章字数会少一点,因为很多逻辑跟单条消息的新增是一样的。
如有错误,欢迎指出!!!
相关文章:
【RocketMQ 存储】- broker 端存储单条消息的逻辑
文章目录 1. 前言2. DefaultMessageStore#asyncPutMessage 添加单条消息2.1 DefaultMessageStore#checkStoreStatus 检查存储服务的状态2.2 DefaultMessageStore#checkMessage 校验消息长度是否合法2.3 CommitLog#asyncPutMessage 核心存储逻辑2.4 MappedFile#appendMessage2.5…...
爬虫基础(四)线程 和 进程 及相关知识点
目录 一、线程和进程 (1)进程 (2)线程 (3)区别 二、串行、并发、并行 (1)串行 (2)并行 (3)并发 三、爬虫中的线程和进程 &am…...
29. C语言 可变参数详解
本章目录: 前言可变参数的基本概念可变参数的工作原理如何使用可变参数 示例:计算多个整数的平均值解析: 更复杂的可变参数示例:打印可变数量的字符串解析: 总结 前言 在C语言中,函数参数的数量通常是固定的ÿ…...
Java CAS操作
通过前面的学习认识到了CPU缓存,Java内存模型,以及线程安全的原子、可见、顺序三大特性。本文则重点认识CAS操作,这是Java并发编程常见的一个操作,AbstractQueuedSynchronizer基于此操作提供了丰富的同步器和各种锁。 CAS&#x…...
KNIME:开源 AI 数据科学
KNIME(Konstanz Information Miner)是一款开源且功能强大的数据科学平台,由德国康斯坦茨大学的软件工程师团队开发,自2004年推出以来,广泛应用于数据分析、数据挖掘、机器学习和可视化等领域。以下是对KNIME的深度介绍…...
超级强大的压缩和解压工具,免费解压
软件介绍 今天要给大家分享一款超厉害的软件 ——ZArchiver,在我心中,它堪称安卓平台目前最为强大的解压软件。 之前,我一直使用 MT 管理器来解压文件。然而,MT 管理器存在一些局限性。比如在处理解压分卷文件时,它有时…...
代码随想录_栈与队列
栈与队列 232.用栈实现队列 232. 用栈实现队列 使用栈实现队列的下列操作: push(x) – 将一个元素放入队列的尾部。 pop() – 从队列首部移除元素。 peek() – 返回队列首部的元素。 empty() – 返回队列是否为空。 思路: 定义两个栈: 入队栈, 出队栈, 控制出入…...
基于STM32的智能停车场管理系统设计
目录 引言系统设计 硬件设计软件设计 系统功能模块 车辆识别与进出管理模块车位检测与引导模块计费与支付模块数据存储与查询模块远程监控与异常报警模块 控制算法 车牌识别与车辆进出管理算法车位检测与引导算法计费与支付处理算法数据存储与远程反馈算法 代码实现 车辆检测与…...
告别重启!Vue CLI 动态代理配置实战:实现热更新与灵活配置
在前端开发中,代理配置是解决跨域问题的常见手段。尤其是在使用 Vue CLI 进行开发时,我们经常需要通过 devServer.proxy 来配置代理。然而,传统的代理配置通常是静态的,修改后需要重启开发服务器,这在频繁调整代理配置…...
Cocos Creator 3.8 2D 游戏开发知识点整理
目录 Cocos Creator 3.8 2D 游戏开发知识点整理 1. Cocos Creator 3.8 概述 2. 2D 游戏核心组件 (1) 节点(Node)与组件(Component) (2) 渲染组件 (3) UI 组件 3. 动画系统 (1) 传统帧动画 (2) 动画编辑器 (3) Spine 和 …...
【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】1.28 存储之道:跨平台数据持久化方案
好的,我将按照您的要求生成一篇高质量的Python NumPy文章。以下是第28篇《存储之道:跨平台数据持久化方案》的完整内容,包括目录、正文和参考文献。 1.28 存储之道:跨平台数据持久化方案 目录 #mermaid-svg-n1z37AP8obEgptkD {f…...
chrome源码剖析—UI架构消息机制
Chrome 浏览器的 UI 架构是高度模块化且基于现代图形技术和用户界面设计理念构建的。它的 UI 架构涵盖了窗口、标签页、控件、通知、菜单等组件的管理和交互。Chrome 的 UI 基本上是通过 views 框架和 Aura(Chrome 自己的 UI 层)构建的,后者又…...
面试经典150题——图的广度优先搜索
文章目录 1、蛇梯棋1.1 题目链接1.2 题目描述1.3 解题代码1.4 解题思路 2、最小基因变化2.1 题目链接2.2 题目描述2.3 解题代码2.4 解题思路 3、单词接龙3.1 题目链接3.2 题目描述3.3 解题代码3.4 解题思路 1、蛇梯棋 1.1 题目链接 点击跳转到题目位置 1.2 题目描述 给你一…...
Day30-【AI思考】-错题分类进阶体系——12维错误定位模型
文章目录 错题分类进阶体系——12维错误定位模型**一、认知层错误(根源性缺陷)****二、操作层错误(执行过程偏差)****三、心理层错误(元认知障碍)****四、进阶错误(专业级陷阱)** 错…...
利用Edu邮箱解锁Notion Pro,提升学习与工作效率
摘要: 本文将详细介绍如何通过Edu教育邮箱申请教育订阅,从而免费获得Notion Pro版的所有高级功能。此外,我们还将简要提及Edu邮箱的其他福利,如申请Azure 100免费VPS和OpenAI。通过对比Notion免费版和Pro版的差异,你将…...
剑指 Offer II 010. 和为 k 的子数组
comments: true edit_url: https://github.com/doocs/leetcode/edit/main/lcof2/%E5%89%91%E6%8C%87%20Offer%20II%20010.%20%E5%92%8C%E4%B8%BA%20k%20%E7%9A%84%E5%AD%90%E6%95%B0%E7%BB%84/README.md 剑指 Offer II 010. 和为 k 的子数组 题目描述 给定一个正整数数组和一个…...
【外文原版书阅读】《机器学习前置知识》2.用看电影推荐的例子带你深入了解向量点积在机器学习的作用
目录 3.3 Where Are You Looking, Vector? The Dot Product 个人主页:Icomi 大家好,我是Icomi,本专栏是我阅读外文原版书《Before Machine Learning》对于文章中我认为能够增进线性代数与机器学习之间的理解的内容的一个输出,希望…...
Vue.js组件开发-实现全屏平滑移动、自适应图片全屏滑动切换
使用Vue实现全屏平滑移动、自适应图片全屏滑动切换的功能。使用Vue 3和Vue Router,并结合一些CSS样式来完成这个效果。 步骤 创建Vue项目:使用Vue CLI创建一个新的Vue项目。准备图片:将需要展示的图片放在项目的public目录下。创建组件&…...
openRv1126 AI算法部署实战之——ONNX模型部署实战
在RV1126开发板上部署ONNX算法,实时目标检测RTSP传输。视频演示地址 rv1126 yolov5 实时目标检测 rtsp传输_哔哩哔哩_bilibili 一、准备工作 1.从官网下载YOLOv5-v7.0工程(YOLOv5的第7个版本) 手动在线下载: Releases ultraly…...
实验作业管理系统的设计与实现
标题:实验作业管理系统的设计与实现 内容:1.摘要 本系统旨在解决当前实验作业管理中存在的问题,提高管理效率和质量。通过对现有系统的调研和分析,我们确定了系统的功能需求和性能要求,并采用了先进的技术和架构进行设计和实现。系统实现了实…...
【愚公系列】《循序渐进Vue.js 3.x前端开发实践》032-组件的Teleport功能
标题详情作者简介愚公搬代码头衔华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主&…...
leetcode——二叉树的最大深度(java)
给定一个二叉树 root ,返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1: 输入:root [3,9,20,null,null,15,7] 输出:3 示例 2: 输入:root [1,null,2] 输…...
【PyTorch】3.张量类型转换
个人主页:Icomi 在深度学习蓬勃发展的当下,PyTorch 是不可或缺的工具。它作为强大的深度学习框架,为构建和训练神经网络提供了高效且灵活的平台。神经网络作为人工智能的核心技术,能够处理复杂的数据模式。通过 PyTorch࿰…...
自制一个入门STM32 四足机器人具体开发顺序
0 前期准备 1. 知识储备 学习 STM32 微控制器的基础知识,包括 GPIO、定时器、串口通信等外设的使用,可通过官方文档、教程和视频课程进行学习。了解舵机控制原理,因为四足机器人通常使用舵机来实现关节运动。掌握基本的机械结构设计知识&am…...
SpringCloud基础二(完结)
HTTP客户端Feign 在SpringCloud基础一中,我们利用RestTemplate结合服务注册与发现来发起远程调用的代码如下: String url "http://userservice/user/" order.getUserId(); User user restTemplate.getForObject(url, User.class);以上代码就…...
云原生时代,如何构建高效分布式监控系统
文章目录 一.监控现状二.Thanos原理分析SidecarQuerierStoreCompactor 三.Sidecar or ReceiverThanos Receiver工作原理 四.分布式运维架构 一.监控现状 Prometheus是CNCF基金会管理的一个开源监控项目,由于其良好的架构设计和完善的生态,迅速成为了监控…...
WordPress使用(1)
1. 概述 WordPress是一个开源博客框架,配合不同主题,可以有多种展现方式,博客、企业官网、CMS系统等,都可以很好的实现。 官网:博客工具、发布平台和内容管理系统 – WordPress.org China 简体中文,这里可…...
小白爬虫冒险之反“反爬”:无限debugger、禁用开发者工具、干扰控制台...(持续更新)
背景浅谈 小白踏足JS逆向领域也有一年了,对于逆向这个需求呢主要要求就是让我们去破解**“反爬机制”**,即反“反爬”,脚本处理层面一般都是decipher网站对request设置的cipher,比如破解一个DES/AES加密拿到key。这篇文章先不去谈…...
Time Constant | RC、RL 和 RLC 电路中的时间常数
注:本文为 “Time Constant” 相关文章合辑。 机翻,未校。 How To Find The Time Constant in RC and RL Circuits June 8, 2024 💡 Key learnings: 关键学习点: Time Constant Definition: The time constant (τ) is define…...
Python爬虫学习第三弹 —— Xpath 页面解析 实现无广百·度
早上好啊,大佬们。上回使用 Beautiful Soup 进行页面解析的内容是不是已经理解得十分透彻了~ 这回我们再来尝试使用另外一种页面解析,来重构上一期里写的那些代码。 讲完Xpath之后,小白兔会带大家解决上期里百度搜索的代码编写,保…...
JS 正则表达式 -- 分组【详解】含普通分组、命名分组、反向引用
普通分组 使用圆括号 () 来创建分组捕获匹配的内容,通过正则表达式匹配结果的数组来访问这些捕获的内容。 const str "Hello, World!"; const regex /(Hello), (World)!$/; const match str.match(regex);if (match) {console.log("完整匹配结果…...
Leetcode刷题-不定长滑动窗口
分享丨【题单】滑动窗口与双指针(定长/不定长/单序列/双序列/三指针/分组循环) - 力扣(LeetCode) 3090 class Solution:def maximumLengthSubstring(self, s: str) -> int:c Counter()res 0rk -1for i in range(len(s)):i…...
【Rust自学】15.6. RefCell与内部可变性:“摆脱”安全性限制
题外话,这篇文章一共4050字,是截止到目前为止最长的文章,如果你能坚持读完并理解,那真的很强! 喜欢的话别忘了点赞、收藏加关注哦(加关注即可阅读全文),对接下来的教程有兴趣的可以…...
护眼好帮手:Windows显示器调节工具
在长时间使用电脑的过程中,显示器的亮度和色温对眼睛的舒适度有着重要影响。传统的显示器调节方式不仅操作繁琐,而且在低亮度下容易导致色彩失真。因此,今天我想为大家介绍一款适用于Windows系统的护眼工具,它可以帮助你轻松调节显…...
使用 OpenResty 构建高效的动态图片水印代理服务20250127
使用 OpenResty 构建高效的动态图片水印代理服务 在当今数字化的时代,图片在各种业务场景中广泛应用。为了保护版权、统一品牌形象,动态图片水印功能显得尤为重要。然而,直接在后端服务中集成水印功能,往往会带来代码复杂度增加、…...
36、【OS】【Nuttx】OSTest分析(2):环境变量测试
背景 2025.1.29 蛇年快乐! 接之前wiki 35、【OS】【Nuttx】OSTest分析(1):stdio测试(五) 已经分析完了第一个测试项,输入输出端口测试,接下来分析下环境变量测试,也比较…...
C++并发编程指南04
文章目录 共享数据的问题3.1.1 条件竞争双链表的例子条件竞争示例恶性条件竞争的特点 3.1.2 避免恶性条件竞争1. 使用互斥量保护共享数据结构2. 无锁编程3. 软件事务内存(STM) 总结互斥量与共享数据保护3.2.1 互斥量使用互斥量保护共享数据示例代码&…...
Java实现LRU缓存策略实战
实现LRU模型选择LRU缓存回收算法集成Google Guava(LRU缓存策略)插件Google Guava(LRU策略)缓存示例总结LRU(Least Recently Used,最近最少使用)缓存是一种常见的缓存淘汰策略。它的基本思想是优先保留最近被访问过的数据,淘汰最久未被访问的数据。这种策略的目的是为了…...
三个不推荐使用的线程池
线程池的种类 其实看似这么多的线程池,都离不开ThreadPoolExecutor去创建,只不过他们是简化一些参数 newFixedThreadPool 里面全是核心线程 有资源耗尽的风险,任务队列最大长度为Integer.MAX_VALUE,可能会堆积大量的请求ÿ…...
Golang 并发机制-1:Golang并发特性概述
并发是现代软件开发中的一个基本概念,它使程序能够同时执行多个任务,从而提高效率和响应能力。在本文中,我们将探讨并发性在现代软件开发中的重要性,并深入研究Go处理并发任务的独特方法。 并发的重要性 增强性能 并发在提高软…...
Flink中的时间和窗口
在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。 所谓的“窗口”,一般就是划定的一段时…...
Alfresco Content Services dockerCompose自动化部署详尽操作
Alfresco Content Services docker社区部署文档 Alfresco Content Services简介 官方说明书 https://support.hyland.com/r/Alfresco/Alfresco-Content-Services-Community-Edition/23.4/Alfresco-Content-Services-Community-Edition/Using/Content/Folder-rules/Defining-…...
吴恩达深度学习——深层神经网络
来自https://www.bilibili.com/video/BV1FT4y1E74V,仅为本人学习所用。 符号约定 对于该深层网络,有四层,包含三个隐藏层和一个输出层。 隐藏层中,第一层有五个单元、第二层有五个单元,第三层有三个单元。标记 l l l…...
【算法设计与分析】实验1:字符串匹配问题的算法设计与求解
目录 一、实验目的 二、实验环境 三、实验内容 四、核心代码 五、记录与处理 六、思考与总结 七、完整报告和成果文件提取链接 一、实验目的 给定一个文本,在该文本中查找并定位任意给定字符串。 1、深刻理解并掌握蛮力法的设计思想; 2、提高应用…...
C语言二级题解:查找字母以及其他字符个数、数字字符串转双精度值、二维数组上下三角区域数据对调
目录 一、程序填空题 --- 查找字母以及其他字符个数 题目 分析 二、程序修改 --- 数字字符串转双精度值 题目 分析 小数位字符串转数字 三、程序设计 --- 二维数组上下三角区域数据对调 题目 分析 前言 本文来讲解: 查找字母以及其他字符个数、数字字符串…...
Git进阶之旅:Git 配置信息 Config
Git 配置级别: 仓库级别:local [ 优先级最高 ]用户级别:global [ 优先级次之 ]系统级别:system [ 优先级最低 ] 配置文件位置: git 仓库级别对应的配置文件是当前仓库下的 .git/configgit 用户级别对应的配置文件时用…...
Qwen2-VL:在任何分辨率下增强视觉语言模型对世界的感知 (大型视觉模型 核心技术 分享)
摘要 我们推出了Qwen2-VL系列,这是对之前Qwen-VL模型的高级升级,重新定义了视觉处理中的常规预设分辨率方法。Qwen2-VL引入了Naive Dynamic Resolution机制,使模型能够动态地将不同分辨率的图像转换为不同的视觉令牌数量。这种方法允许模型生成更高效和准确的视觉表示,紧密…...
【C语言】在Windows上为可执行文件.exe添加自定义图标
本文详细介绍了在 Windows 环境下,如何为使用 GCC 编译器编译的 C程序 添加自定义图标,从而生成带有图标的 .exe 可执行文件。通过本文的指导,读者可以了解到所需的条件以及具体的操作步骤,使生成的程序更具专业性和个性化。 目录 1. 准备条件2. 具体步骤步骤 1: 准备资源文…...
记录 | Docker的windows版安装
目录 前言一、1.1 打开“启用或关闭Windows功能”1.2 安装“WSL”方式1:命令行下载方式2:离线包下载 二、Docker Desktop更新时间 前言 参考文章:Windows Subsystem for Linux——解决WSL更新速度慢的方案 参考视频:一个视频解决D…...
FortiOS 存在身份验证绕过导致命令执行漏洞(CVE-2024-55591)
免责声明: 本文旨在提供有关特定漏洞的深入信息,帮助用户充分了解潜在的安全风险。发布此信息的目的在于提升网络安全意识和推动技术进步,未经授权访问系统、网络或应用程序,可能会导致法律责任或严重后果。因此,作者不对读者基于本文内容所采取的任何行为承担责任。读者在…...