当前位置: 首页 > news >正文

09-RocketMQ 深度解析:从原理到实战,构建可靠消息驱动微服务

RocketMQ 深度解析:从原理到实战,构建可靠消息驱动微服务

一、RocketMQ 核心定位与架构探秘

1.1 分布式消息领域的中流砥柱

在分布式系统中,消息队列是实现异步通信、解耦服务、削峰填谷的关键组件。RocketMQ 作为阿里巴巴开源的分布式消息中间件,凭借卓越的性能、高可靠性和丰富功能,在电商、金融、社交等众多领域广泛应用。它能够支撑海量消息的高并发处理,确保消息不丢失、不重复,为分布式系统的稳定运行提供坚实保障。

1.2 精妙架构剖析

消费者集群
Broker集群
NameServer集群
生产者集群
发送消息
发送消息
存储路由信息
存储路由信息
存储消息
存储消息
存储消息
存储消息
拉取消息
拉取消息
Consumer实例1
Consumer实例2
Topic 1
Topic 2
Broker节点1
Broker节点2
NameServer节点1
Producer实例1
NameServer节点2
Producer实例2
  1. NameServer:充当轻量级的服务发现与路由中心,无状态且易扩展。每个 NameServer 节点相互独立,存储所有 Broker 的路由信息。生产者和消费者启动时,会向所有 NameServer 节点注册,并定时拉取最新路由数据,以确保消息能准确发送和接收。
  2. Broker:负责消息的存储、转发和查询。支持主从架构,主 Broker 处理读写操作,从 Broker 通过同步主 Broker 数据实现热备。当主 Broker 故障时,从 Broker 可自动切换为主,保障消息服务的连续性。Broker 还对消息进行高效的存储管理,采用顺序写盘和零拷贝技术,极大提升消息读写性能。
  3. Producer:消息生产者,根据消息的 Topic,从 NameServer 获取 Broker 地址列表,通过负载均衡策略选择一个 Broker 发送消息。支持同步、异步和单向发送模式,满足不同业务场景对消息发送可靠性和性能的要求。
  4. Consumer:消息消费者,从 NameServer 获取订阅 Topic 的 Broker 地址,采用拉模式主动从 Broker 拉取消息。支持集群消费和广播消费两种模式,集群消费下,同一消费组内的消费者平均分摊消息;广播消费则是每个消费者都会收到全量消息。

1.3 核心组件协同机制

组件交互交互流程应用场景
Producer 与 NameServerProducer 启动时向所有 NameServer 注册自身信息,并定时拉取最新 Broker 路由数据电商下单场景中,订单服务(Producer)需实时知晓消息存储位置,确保订单消息准确发送
Producer 与 BrokerProducer 根据路由信息,选择 Broker 发送消息。发送成功后,Broker 返回确认响应在物流通知场景中,订单系统(Producer)向物流消息队列(Broker)发送发货消息,需确认消息已成功存储
Consumer 与 NameServerConsumer 启动时从 NameServer 获取订阅 Topic 的 Broker 地址列表,并定期更新用户评论系统(Consumer)需持续获取评论消息队列(Broker)地址,保证及时消费新评论消息
Consumer 与 BrokerConsumer 按一定策略从 Broker 拉取消息,处理成功后向 Broker 发送确认在积分系统中,消费订单支付成功消息(从 Broker 拉取),处理后确认消息已消费,防止重复处理

二、消息发送与接收:核心功能解析

2.1 消息发送:可靠与高效的多模式支持

RocketMQ 提供三种消息发送模式,满足不同业务对可靠性和性能的需求。以下是各模式的详细解析:

2.1.1 同步发送(Sync Send)

描述:发送消息后同步等待 Broker 确认响应,确保消息发送成功,适用于对可靠性要求极高的场景。
适用场景:金融交易、订单创建等不能容忍消息丢失的核心业务。
代码示例

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");  
producer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");  
producer.start();  Message message = new Message(  "OrderTopic",   // Topic名称  "CREATE",       // Tag(用于消息过滤)  "ORDER_1001",   // 消息键(唯一标识)  "订单创建成功,商品ID=1001".getBytes()  // 消息体  
);  // 同步发送,获取发送结果  
SendResult sendResult = producer.send(message);  
System.out.println("发送状态:" + sendResult.getSendStatus());  
System.out.println("消息队列:" + sendResult.getMessageQueue());  producer.shutdown();  
2.1.2 异步发送(Async Send)

描述:发送消息后不阻塞等待响应,通过回调函数处理结果,提升发送性能,适用于高并发非核心业务。
适用场景:日志采集、监控指标上报、通知类消息。
代码示例

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");  
producer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");  
producer.start();  Message message = new Message("LogTopic", "INFO", "LOG_001", "系统启动日志".getBytes());  // 异步发送,设置回调函数  
producer.send(message, new SendCallback() {  @Override  public void onSuccess(SendResult sendResult) {  System.out.println("异步发送成功:" + sendResult.getSendStatus());  }  @Override  public void onException(Throwable e) {  System.err.println("异步发送失败:" + e.getMessage());  }  
});  // 保持Producer运行,确保回调执行  
Thread.sleep(1000);  
producer.shutdown();  
2.1.3 单向发送(One-way Send)

描述:发送消息后忽略响应,追求极致性能,不保证消息一定到达 Broker。
适用场景:对可靠性要求极低的场景(如非核心指标上报)。
代码示例

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");  
producer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");  
producer.start();  Message message = new Message("MetricTopic", "TPS", "METRIC_001", "当前TPS=1000".getBytes());  // 单向发送(无响应)  
producer.sendOneway(message);  producer.shutdown();  
2.1.4 消息重试机制

当发送失败(如网络波动)时,RocketMQ 自动重试发送,可通过参数调整重试策略:

DefaultMQProducer producer = new DefaultMQProducer("producerGroup");  
producer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");  
producer.setRetryTimesWhenSendFailed(3); // 失败后重试3次  
producer.setSendMsgTimeout(5000); // 单次发送超时5秒  
producer.start();  
// 发送消息代码...  
2.2 消息接收:灵活消费模式与高效处理

RocketMQ 支持两种核心消费模式,配合消息过滤机制,满足多样化的消费需求。

2.2.1 集群消费(Cluster Consumption)

描述:同一消费组内的消费者平均分摊消息,适用于负载均衡场景,是最常用的消费模式。
核心特性

  • 消息只会被消费组内的一个消费者处理
  • 支持消息重试(消费失败时重新入队)
  • 需保证消费逻辑的幂等性

代码示例

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");  
consumer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");  
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群消费模式  
consumer.subscribe("OrderTopic", "CREATE"); // 订阅Topic及Tag  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(  List<MessageExt> msgs, ConsumeConcurrentlyContext context  ) {  for (MessageExt msg : msgs) {  String messageBody = new String(msg.getBody());  System.out.println("消费消息:" + messageBody);  // 业务处理逻辑  processOrder(msg);  }  // 批量确认消费成功  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  
});  consumer.start();  
2.2.2 广播消费(Broadcast Consumption)

描述:每个消费者实例都会收到全量消息,适用于需要所有消费者处理相同消息的场景。
核心特性

  • 消息会被消费组内的所有消费者处理
  • 不支持消息重试(消费失败需自行处理)
  • 无需考虑负载均衡

代码示例

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");  
consumer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");  
consumer.setMessageModel(MessageModel.BROADCASTING); // 广播消费模式  
consumer.subscribe("NoticeTopic", "*"); // 订阅所有Tag  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(  List<MessageExt> msgs, ConsumeConcurrentlyContext context  ) {  for (MessageExt msg : msgs) {  String notice = new String(msg.getBody());  sendNotification(notice); // 发送短信、邮件等通知  }  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  
});  consumer.start();  
2.2.3 消息过滤机制

RocketMQ 支持两种过滤方式,精准筛选目标消息:

(1)Tag 过滤(简单快速)

原理:发送消息时设置 Tag,消费时通过 subscribe(topic, tag) 过滤。
示例

// 发送时设置Tag  
Message message = new Message("OrderTopic", "PAY", "ORDER_1002", "支付成功".getBytes());  
producer.send(message);  // 消费时过滤Tag为PAY的消息  
consumer.subscribe("OrderTopic", "PAY");  
(2)SQL 过滤(复杂条件)

原理:基于消息属性(User Property)进行 SQL92 表达式过滤(需在 Broker 配置 enablePropertyFilter=true)。
示例

// 发送时添加属性  
message.putUserProperty("orderAmount", "200");  
message.putUserProperty("region", "SH");  // 消费时过滤金额>100且地区为上海的消息  
consumer.subscribe("OrderTopic", "orderAmount > 100 AND region = 'SH'");  

核心对比:发送与消费模式选择

维度同步发送异步发送单向发送集群消费广播消费
可靠性负载均衡全量消费
吞吐量最高高并发处理广播通知
适用场景核心交易日志采集指标上报订单处理系统通知
代码复杂度简单中等(回调)简单中等(幂等性)简单

通过合理选择发送与消费模式,结合消息过滤机制,可在不同业务场景中发挥 RocketMQ 的最佳性能与可靠性。

三、高级特性:事务消息、顺序消息与消息存储优化

3.1 事务消息:确保分布式事务一致性

  1. 事务消息原理
    在分布式系统中,常涉及多个服务间的事务操作,如电商下单时需同时扣减库存和更新订单状态。RocketMQ 的事务消息通过两阶段提交协议,确保消息发送与本地事务的原子性。生产者先发送半事务消息(仅存储在 Broker 但不允许消费),本地事务执行完成后,根据执行结果向 Broker 提交或回滚事务消息。若本地事务执行状态未知(如网络异常),Broker 会回调生产者的事务状态查询接口,确认事务最终状态。
  2. 代码实现示例
TransactionMQProducer producer = new TransactionMQProducer("producerGroup");
producer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");
TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务,如扣减库存try {// 本地业务逻辑return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 事务状态查询逻辑return LocalTransactionState.COMMIT_MESSAGE;}
};
producer.setTransactionListener(transactionListener);
producer.start();
Message message = new Message("TopicTest", "TagA", "Hello, RocketMQ!".getBytes());
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
producer.shutdown();

3.2 顺序消息:保证消息有序处理

  1. 顺序消息分类与应用场景
    顺序消息分为全局顺序和局部顺序。全局顺序指同一 Topic 下所有消息严格按照发送顺序消费,适用于对消息顺序要求极高且吞吐量要求不高的场景,如数据库 Binlog 同步。局部顺序是指在一个 Message Queue 内的消息按序消费,不同 Queue 间消息无序,在电商订单处理中,同一订单的创建、支付、发货等消息需按序处理,可通过将同一订单消息发送到同一 Queue 实现局部顺序。
  2. 代码实现要点
    生产者发送顺序消息时,需通过 MessageQueueSelector 选择特定 Queue。消费者则按 Queue 依次消费消息,确保顺序性。示例如下:
// 生产者发送顺序消息
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");
producer.start();
Message message = new Message("TopicTest", "TagA", "Hello, RocketMQ!".getBytes());
SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 根据业务逻辑选择Queue,如根据订单ID取模Long orderId = (Long) arg;long index = orderId % mqs.size();return mqs.get((int) index);}
}, orderId);
producer.shutdown();
// 消费者接收顺序消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}
});
consumer.start();

3.3 消息存储优化:高性能读写的奥秘

  1. 存储结构剖析:RocketMQ 采用基于 CommitLog 的存储结构,所有 Topic 的消息都顺序写入 CommitLog 文件,通过 ConsumeQueue 作为消息索引,加速消息的查询和消费。CommitLog 文件以固定大小(默认 1GB)分段存储,当一个文件写满后,创建新文件继续写入。ConsumeQueue 则存储每个 Topic 下每个 Queue 对应的消息在 CommitLog 中的物理偏移量、消息大小等信息。
  2. 读写性能优化技术
    • 顺序写盘:避免磁盘随机写的性能瓶颈,极大提升写性能。
    • 零拷贝技术:在消息发送和消费时,减少数据在用户空间和内核空间的拷贝次数,提高数据传输效率。例如,在消息发送时,直接将用户态内存中的消息数据通过 DMA(直接内存访问)技术传输到网卡,无需先拷贝到内核态。
    • 刷盘策略:支持同步刷盘和异步刷盘。同步刷盘确保消息写入磁盘后才返回确认,数据可靠性高,但性能略低;异步刷盘将消息先写入内存,定时批量刷盘,性能较高,但存在一定数据丢失风险。可根据业务场景选择合适的刷盘策略,如金融场景采用同步刷盘,日志记录场景采用异步刷盘。

四、Spring Cloud 集成 RocketMQ:实战指南

4.1 环境搭建与依赖引入

  1. 安装 RocketMQ:从 RocketMQ 官方网站下载安装包,解压后按照官方文档启动 NameServer 和 Broker。
  2. 创建 Spring Cloud 项目:使用 Spring Initializr 创建一个 Spring Boot 项目,并引入 RocketMQ 相关依赖。在pom.xml中添加以下依赖:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

4.2 生产者配置与消息发送

  1. 配置文件设置:在application.yml中配置 RocketMQ 生产者相关参数,如 NameServer 地址、生产者组等:
spring:cloud:stream:rocketmq:binder:namesrv-addr: nameserver1:9876;nameserver2:9876bindings:orderOutput:destination: orderTopicgroup: orderProducerGroup
  1. 消息发送代码实现:创建一个服务类,使用RocketMQTemplate发送消息。例如,发送订单消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class OrderProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendOrder(Order order) {rocketMQTemplate.send("orderOutput", MessageBuilder.withPayload(order).build());}
}

4.3 消费者配置与消息接收

  1. 配置文件调整:在application.yml中配置消费者相关参数,如订阅的 Topic、消费组等:
spring:cloud:stream:rocketmq:binder:namesrv-addr: nameserver1:9876;nameserver2:9876bindings:orderInput:destination: orderTopicgroup: orderConsumerGroup
  1. 消息接收代码实现:创建一个消息监听器类,处理接收到的订单消息:
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Service
public class OrderConsumer {@StreamListener("orderInput")public void handleOrder(Order order) {// 处理订单逻辑,如更新订单状态、扣减库存等}
}

4.4 事务消息与顺序消息在 Spring Cloud 中的应用

4.4.1 事务消息完整实现

在 Spring Cloud 中实现事务消息,需结合RocketMQTransactionManager和自定义事务监听器,确保本地事务与消息发送的一致性。

(1)配置事务管理器与监听器
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;  
import org.apache.rocketmq.spring.core.RocketMQTemplate;  
import org.apache.rocketmq.spring.transaction.RocketMQTransactionListenerAdapter;  
import org.apache.rocketmq.spring.transaction.TransactionSendResult;  
import org.springframework.messaging.Message;  
import org.springframework.stereotype.Component;  
import org.springframework.transaction.PlatformTransactionManager;  @Configuration  
public class RocketMQConfig {  @Bean  public PlatformTransactionManager transactionManager() {  return new RocketMQTransactionManager(rocketMQTemplate());  }  @Bean  public RocketMQTemplate rocketMQTemplate(DefaultMQProducer producer) {  RocketMQTemplate template = new RocketMQTemplate();  template.setProducer(producer);  return template;  }  // 自定义事务监听器  @Component  @RocketMQTransactionListener(txProducerGroup = "order-producer-group")  public class OrderTransactionListener extends RocketMQTransactionListenerAdapter {  @Override  public LocalTransactionState executeLocalTransaction(Message message, Object arg) {  Order order = (Order) message.getPayload();  try {  // 执行本地事务(如创建订单、扣减库存)  orderService.createOrder(order);  inventoryService.deductStock(order.getProductId(), order.getQuantity());  return LocalTransactionState.COMMIT_MESSAGE;  } catch (Exception e) {  // 本地事务失败,回滚消息  return LocalTransactionState.ROLLBACK_MESSAGE;  }  }  @Override  public LocalTransactionState checkLocalTransaction(Message message) {  Order order = (Order) message.getPayload();  // 查询本地事务状态(如订单是否创建成功)  boolean orderExists = orderService.checkOrderExists(order.getId());  return orderExists ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOWN;  }  }  
}  
(2)生产者发送事务消息
@Service  
public class OrderProducer {  @Autowired  private RocketMQTemplate rocketMQTemplate;  public void sendOrderWithTransaction(Order order) {  // 发送事务消息(半事务消息)  TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(  "orderOutput",  MessageBuilder.withPayload(order).build(),  order  // 传递给事务监听器的参数(如订单对象)  );  // 处理发送结果  if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {  log.info("事务消息提交成功");  }  }  
}  
4.4.2 顺序消息完整实现

在 Spring Cloud 中发送顺序消息,需通过RocketMQTemplatesendOrderly方法指定队列选择策略,消费者使用MessageListenerOrderly确保顺序消费。

(1)生产者发送顺序消息
@Service  
public class OrderProducer {  @Autowired  private RocketMQTemplate rocketMQTemplate;  public void sendOrderInOrder(Order order) {  // 按订单ID选择队列(同一订单ID发送到同一队列)  rocketMQTemplate.sendOrderly(  "orderTopic",  MessageBuilder.withPayload(order).build(),  (mqs, msg, arg) -> {  Long orderId = (Long) arg;  int index = (int) (orderId % mqs.size());  return mqs.get(index);  // 选择固定队列  },  order.getId()  // 作为队列选择的参数  );  }  
}  
(2)消费者接收顺序消息
@Service  
public class OrderConsumer {  @StreamListener("orderInput")  public void handleOrderInOrder(List<MessageExt> msgs, ConsumeOrderlyContext context) {  for (MessageExt msg : msgs) {  Order order = JSON.parseObject(msg.getBody(), Order.class);  // 按顺序处理订单(如订单创建→支付→发货)  processOrder(order);  // 手动确认消费(顺序消息需确保处理成功后再提交)  context.setStatus(ConsumeOrderlyStatus.SUCCESS);  }  }  private void processOrder(Order order) {  // 业务处理逻辑(需保证幂等性,避免重复处理)  if (!orderRepository.existsById(order.getId())) {  orderRepository.save(order);  }  }  
}  
4.4.3 最佳实践与注意事项
  1. 事务消息注意点

    • 幂等性设计:本地事务需支持重复执行(如根据订单 ID 幂等校验),避免事务状态查询时重复操作
    • 超时处理:设置合理的transactionTimeout(默认 60 秒),避免长时间占用事务资源
    • 异常处理:对checkLocalTransaction方法返回UNKNOWN的情况,需增加人工补偿接口
  2. 顺序消息优化

    • 队列数量:根据吞吐量需求设置 Queue 数量(建议 Queue 数 = 消费者线程数),避免单个 Queue 成为瓶颈
    • 消费重试:顺序消息消费失败时,默认会重试(可通过maxReconsumeTimes配置重试次数),需确保重试期间 Queue 被独占
    • 流量控制:对顺序消息队列单独配置流量控制策略(如限制单个 Queue 的消费速率)
  3. 生产环境配置

    spring:  cloud:  stream:  rocketmq:  binder:  namesrv-addr: nacos-nameserver:9876  # 集群地址  producer:  group: order-producer-group  send-message-timeout: 3000  # 发送超时3秒  retry-times-when-send-failed: 3  # 失败重试3次  consumer:  group: order-consumer-group  consume-thread-min: 16  # 最小消费线程数  consume-thread-max: 32  # 最大消费线程数  
    

五、生产环境运维与监控

5.1 集群高可用部署

  1. NameServer 集群:至少部署 3 个节点,通过 DNS 轮询或负载均衡器对外提供服务,确保无单点故障
  2. Broker 主从架构:
    • 配置brokerRole=ASYNC_MASTER(异步复制)或SYNC_MASTER(同步复制)
    • 从 Broker 定期同步主 Broker 的 CommitLog 和 ConsumeQueue 数据
  3. 生产者 / 消费者集群:
    • 生产者采用集群模式,通过负载均衡发送消息到不同 Broker
    • 消费者组内实例数不超过 Queue 数量,避免资源浪费

5.2 核心监控指标

指标名称监控目的采集方式阈值建议
rocketmq_producer_send_success生产者发送成功率RocketMQ 内置 Metrics≥99.9%
rocketmq_consumer_lag消费者消息堆积量(队列最小偏移量)Broker 暴露的 HTTP 接口<1000 条(根据业务吞吐量调整)
commitlog_disk_used_ratio磁盘使用率操作系统监控<80%
consumer_thread_pool_queue_size消费线程池等待任务数消费者 JVM 监控<100

5.3 故障排查工具

  1. RocketMQ Console:可视化管理平台,支持 Topic/Queue 状态查看、消息轨迹追踪

  2. mqadmin 命令:

    # 查看消费者状态  
    sh mqadmin consumerStatus -g consumerGroup -n namesrvAddr  
    # 查看消息堆积量  
    sh mqadmin clusterList -n namesrvAddr  
    
  3. 分布式链路追踪:结合 SkyWalking 或 Jaeger,通过消息 ID 关联生产者和消费者链路

六、总结与扩展

6.1 核心价值

RocketMQ 通过 “高性能存储 + 灵活消息模式 + 强一致性保障”,成为分布式系统异步通信的首选方案。在 Spring Cloud 生态中,其与 Nacos、Seata 等组件的深度整合,进一步提升了微服务架构的可靠性和可扩展性。

6.2 适用场景总结

场景RocketMQ 特性匹配典型案例
异步解耦高吞吐量、多语言支持电商订单 - 库存异步同步
最终一致事务事务消息 + 重试机制金融转账异步对账
顺序处理局部顺序消息 + 队列分区物流状态机顺序更新
流量削峰海量消息堆积能力秒杀活动流量缓冲

6.3 未来趋势

  1. 云原生适配:支持 Kubernetes 集群部署,集成 Operator 实现自动化运维
  2. 多协议支持:完善 gRPC、HTTP/2 协议适配,满足异构系统通信需求
  3. Serverless 化:推出 Serverless 消息队列服务,降低使用门槛

通过本文的实战解析,开发者可在 Spring Cloud 项目中快速落地 RocketMQ 的高级特性,构建可靠的消息驱动微服务。

相关文章:

09-RocketMQ 深度解析:从原理到实战,构建可靠消息驱动微服务

RocketMQ 深度解析&#xff1a;从原理到实战&#xff0c;构建可靠消息驱动微服务 一、RocketMQ 核心定位与架构探秘 1.1 分布式消息领域的中流砥柱 在分布式系统中&#xff0c;消息队列是实现异步通信、解耦服务、削峰填谷的关键组件。RocketMQ 作为阿里巴巴开源的分布式消息…...

MyBatis 如何使用

1. 环境准备 添加依赖&#xff08;Maven&#xff09; 在 pom.xml 中添加 MyBatis 和数据库驱动依赖&#xff1a; <dependencies><!-- MyBatis 核心库 --><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId&g…...

AI日报 - 2025年04月17日

&#x1f31f; 今日概览(60秒速览) ▎&#x1f916; AGI突破 | OpenAI新模型或证人类未解定理&#xff0c;研究达Level 4 OpenAI安全博客暗示模型将创造新科学&#xff0c;能连接概念提新实验。CEO预测AI将证明人类未解定理&#xff0c;研究员称已达AGI第四层级。 ▎&#x1f4…...

【Leetcode-Hot100】缺失的第一个正数

题目 解答 有一处需要注意&#xff0c;我使用注释部分进行交换值&#xff0c;报错&#xff1a;超出时间限制。有人知道是为什么吗&#xff1f;难道是先给nums[i]赋值后&#xff0c;从而改变了后一项的索引&#xff1f; class Solution(object):def firstMissingPositive(sel…...

Servlet简单示例

Servlet简单示例 文章说明 Servlet 虽然是一门旧技术了&#xff0c;但是它的基础性和广泛性仍然不可忽视&#xff1b;我在实践中发现不少同学经常会被它的一些特性给困惑住&#xff1b;时常出现404等错误&#xff0c;这里我写下这篇文章&#xff0c;介绍Servlet的不同版本的特…...

spring:注解@Component、@Controller、@Service、@Reponsitory

背景 spring框架的一个核心功能是IOC&#xff0c;就是将Bean初始化加载到容器中&#xff0c;Bean是如何加载到容器的&#xff0c;可以使用spring注解方式或者spring XML配置方式。 spring注解方式直接对项目中的类进行注解&#xff0c;减少了配置文件内容&#xff0c;更加便于…...

LLM做逻辑推理题 - 野鸭蛋的故事

题目: 四个旅游家&#xff08;张虹、印玉、东晴、西雨&#xff09;去不同的岛屿去旅行&#xff0c;每个人都在岛上发现了野鸡蛋&#xff08;1个到3个&#xff09;。4人的年龄各不相同&#xff0c;是由18岁到21岁。已知&#xff1a; ①东晴是18岁。 ②印玉去了A岛。 ③21岁的女…...

Linux的目录结构(介绍,具体目录结构)

目录 介绍 具体目录结构 简洁的目录解释 详细的目录解释 介绍 Linux的文件系统是采用级层式的树状目录结构&#xff0c;在此结构的最上层是根目录“/”。Linux的世界中&#xff0c;一切皆文件&#xff08;比如&#xff1a;Linux会把硬件映射成文件来管理&#xff09; 具体目…...

C++Cherno 学习笔记day21 [86]-[90] 持续集成、静态分析、参数计算顺序、移动语义、stdmove与移动赋值操作符

b站Cherno的课[86]-[90] 一、C持续集成二、C静态分析三、C的参数计算顺序四、C移动语义五、stdmove与移动赋值操作符 一、C持续集成 Jenkins 商业软件 二、C静态分析 静态分析器会检查你的代码&#xff0c;并尝试检测各种错误&#xff0c;这些错误 可能是你无意中编写的&am…...

python学习 -- 综合案例1:设计一款基于python的飞机大战小游戏

本文目录 pygame模块介绍核心模块与功能开发流程 本文案例 - 飞机大战开发流程1. 导入必要的库2. 定义常量3. 创建精灵类4. 主程序 运行游戏 总结 pygame模块介绍 Pygame 是基于 Python 的开源、跨平台游戏开发库&#xff0c;依托 SDL&#xff08;Simple DirectMedia Layer&am…...

开启 Python 编程之旅:基础入门实战班全解析

重要的东西放前面 开启 Python 编程之旅&#xff1a;基础入门实战班全解析 开启Python编程之旅&#xff1a;基础入门实战班全解析 在当下热门的编程语言中&#xff0c;Python凭借简洁易读的语法、强大的功能和丰富的库&#xff0c;在数据科学、人工智能、Web开发等诸多领域大…...

Linux笔记---动静态库(原理篇)

1. ELF文件格式 动静态库文件的构成是什么样的呢&#xff1f;或者说二者的内容是什么&#xff1f; 实际上&#xff0c;可执行文件&#xff0c;目标文件&#xff0c;静态库文件&#xff0c;动态库文件都是使用ELF文件格式进行组织的。 ELF&#xff08;Executable and Linkable…...

SpringBoot整合Logback日志框架深度实践

一、依赖与默认集成机制 SpringBoot从2.x版本开始默认集成Logback日志框架&#xff0c;无需手动添加额外依赖。当项目引入spring-boot-starter-web时&#xff0c;该组件已包含spring-boot-starter-logging&#xff0c;其底层实现基于LogbackSLF4J组合。这种设计使得开发者只需…...

Spring Boot中接入DeepSeek的流式输出

第一步&#xff0c;添加依赖&#xff1a; <!-- WebFlux 响应式支持 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId> </dependency> 第二步&#xff0c;配置We…...

路由交换网络专题 | 第四章 | 生成树 | VRRP | 边缘端口

拓扑图 &#xff08;1&#xff09;SW1、SW2、SW3 三台交换机之间存在环路问题&#xff0c;需要通过生成树协议破环&#xff0c;请简述二层环路可能导致的问题。 因为交换机在收到一个广播帧之后&#xff0c;会对非接收端口进行转发。每台交换机都转发的话&#xff0c;就行形成一…...

SFOS2:常用容器(布局)介绍

一、前言 最近在进行sailfish os的开发&#xff0c;由于在此之前并没有从事过QT开发的工作&#xff0c;所以对这一套颇为生疏&#xff0c;以此记录一下。以下内容不一定完全准确&#xff0c;开发所使用的是Qt Quick 2.6与Sailfish.Silica 1.0两个库。 二、布局 1.Qt Quick 2.…...

VS qt 联合开发环境下的多国语言翻译

添加Linguist 文件方法&#xff0c;如同添加类文件的方式&#xff0c;那样&#xff1a; 其他跟QT的一样的流程&#xff0c;另外在main函数里要注册一下&#xff0c; QTextCodec::setCodecForLocale(textCodec); QTranslator translator5; QString trans5 fi…...

基于 Python 的 ROS2 应用开发全解析

引言 在机器人操作系统&#xff08;ROS&#xff09;不断发展的进程中&#xff0c;ROS2 作为新一代的机器人框架&#xff0c;带来了诸多显著的改进与新特性。Python 作为一种简洁、高效且具有强大数据处理能力的编程语言&#xff0c;在 ROS2 应用开发中占据着重要地位。本文将深…...

AI分析师

01 实操 人工 公司需要开发了一个XX系统&#xff0c;在文件夹中包含了XX.csv&#xff0c;其中每一行表示一个XX样本&#xff0c;最后一列为每个样本的标签&#xff0c;现需要设计模型与系统&#xff0c;请按照以下要求完成算法测试。根据要求完成以下任务&#xff0c;将完成的…...

Redis核心数据类型在实际项目中的典型应用场景解析

精心整理了最新的面试资料和简历模板&#xff0c;有需要的可以自行获取 点击前往百度网盘获取 点击前往夸克网盘获取 Redis作为高性能的键值存储系统&#xff0c;在现代软件开发中扮演着重要角色。其多样化的数据结构为开发者提供了灵活的解决方案&#xff0c;本文将通过真实项…...

LLamaIndex中经常使用的三个模块

from aiostream import stream from fastapi import Request from fastapi.responses import StreamingResponse from llama_index.core.chat_engine.types import StreamingAgentChatResponse这四个模块每一个都很实用&#xff0c;在实际开发中经常用到&#xff0c;下面我就详…...

Idea集成AI:CodeGeeX开发

当入职新公司&#xff0c;或者调到新项目组进行开发时&#xff0c;需要快速熟悉项目代码 而新的项目代码&#xff0c;可能有很多模块&#xff0c;很多的接口&#xff0c;很复杂的业务逻辑&#xff0c;更加有与之前自己的代码风格不一致的现有复杂代码 更别提很多人写代码不喜…...

软考 中级软件设计师 考点知识点笔记总结 day12 计算机网络基础知识

文章目录 计算机网络基础5.1、计算机网络基础知识5.1.1 计算机网络分类5.1.2 七层网络体系结构5.1.3 网络标准5.1.4 TCP/IP协议族5.1.5 IP地址和IPv6简介5.1.6 Internet服务 计算机网络基础 要求掌握以下内容 5.1、计算机网络基础知识 网络体系结构 传输介质 传输技术 传输…...

【扩散模型(十三)】Break-A-Scene 可控生成,原理与代码详解(中)Cross Attn Loss 代码篇

系列文章目录 【扩散模型&#xff08;一&#xff09;】中介绍了 Stable Diffusion 可以被理解为重建分支&#xff08;reconstruction branch&#xff09;和条件分支&#xff08;condition branch&#xff09;【扩散模型&#xff08;二&#xff09;】IP-Adapter 从条件分支的视…...

C语言数字图像处理---2.31统计滤波器

本文介绍空域滤波器中的一种:统计滤波器 [定义与算法] 统计滤波(Statistic Filter)定义:基于图像处理中的邻域统计方法,对邻域内的像素信息进行统计,如基于均值和方差的信息,用于平滑或去噪图像,同时保留边缘信息。 算法步骤如下: 统计滤波器的优点和缺点主要包…...

流程设计实战:流程架构设计六步法

目录 简介 1、梳理业务模式及场景 2、甄别核心业务能力 3、搭建差异化的业务流程框架 4、定义L4流程能力 5、L4流程串联 6、展开L5业务流程 作者简介 简介 以往在设计流程的时候&#xff0c;我多数都是采用的自下而上的方式&#xff0c;从具体场景、具体问题出发去做流…...

SDK游戏盾如何接入?复杂吗?

接入SDK游戏盾&#xff08;通常指游戏安全防护类SDK&#xff0c;如防DDoS攻击、防作弊、防外挂等功能&#xff09;的流程和复杂度取决于具体的服务商&#xff08;如腾讯云、上海云盾等&#xff09;以及游戏类型和技术架构。以下是一般性的接入步骤、复杂度评估及注意事项&#…...

STM32F103C8T6 单片机入门基础知识及点亮第一个 LED 灯

目录 一、引言 二、STM32F103C8T6 基本特性 1. 内核与性能 2. 存储器 3. 时钟系统 4. GPIO&#xff08;通用输入输出&#xff09; 5. 外设 三、开发环境搭建 1. 硬件准备 2. 软件安装 四、点亮第一个 LED 灯 1. 硬件连接 2. 软件实现 &#xff08;1&#xff09;创…...

JavaScript Worker池实现教程

JavaScript Worker池实现教程 Worker池是一种管理和复用Web Workers的有效方法&#xff0c;可以在不频繁创建和销毁Worker的情况下&#xff0c;充分利用多线程能力提升应用性能。下面我将详细介绍如何在JavaScript中实现一个功能完善的Worker池。 为什么需要Worker池&#xf…...

【统信UOS操作系统】python3.11安装numpy库及导入问题解决

一、安装Python3.11.4 首先来安装Python3.11.4。所用操作系统&#xff1a;统信UOS 前提是准备好Python3.11.4的安装包&#xff08;可从官网下载&#xff08;链接&#xff09;&#xff09;&#xff0c;并解压到本地&#xff1a; 右键&#xff0c;选择“在终端中打开”&#xff…...

Navicat导入JSON数据到MySQL表

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Navicat导入JSON数据到MySQL表1. 导入入口2.…...

体育比分小程序怎么提示日活

要提高体育比分小程序的日活跃用户(DAU)&#xff0c;您可以考虑以下几个方面的策略&#xff1a; 一、核心功能优化 1.实时推送&#xff1a;确保比分更新真正实时&#xff0c;延迟不超过2秒&#xff0c;推荐接入熊猫比分API体育数据&#xff0c;比分实时更新 2.个性化订阅&am…...

【星海随笔】Python-JSON数据的处理

JSON 是一种轻量级的数据交换格式&#xff0c;主要用于在客户端和服务器之间传输数据。 JSON 在 python 里是一个标准库 https://www.jyshare.com/compile/9/ import json data {name: Alice, age: 30, city: New York} json_string json.dumps(data) print(json_string)js…...

Tomcat与Servlet

目录 1 Tomcat 1.1 目录结构 1.2 启动服务器 1.3 部署 2 Servlet 2.1 创建项目 &#xff08;1&#xff09;创建Maven项目 &#xff08;2&#xff09;目录结构 &#xff08;3&#xff09;引入依赖 &#xff08;4&#xff09;创建必要的目录结构 &#xff08;5&#xf…...

MySQL MVCC工作流程详解

MySQL MVCC工作流程详解 1. 基础概念 MVCC&#xff08;多版本并发控制&#xff09;是通过在每行记录后面保存多个版本来实现并发控制的技术&#xff0c;主要用于提供并发事务访问数据库时的读一致性。 2. 核心要素 2.1 事务ID&#xff08;DB_TRX_ID&#xff09; 每个事务都…...

unityTEngine 框架学习记录1

目前项目再用QF框架其中的UI部分&#xff0c;突然有天想学习一下其他好用的框架UI&#xff0c;根据我多年网友胖菊大佬的推荐TE映入眼帘,网上找了一下发现学习教程没有几个&#xff0c;不太适合啥都不会的小白&#xff0c;然后我就加入了ET官方群&#xff0c;里面人长得又帅又有…...

算法的时间复杂度

整理了下算法的时间复杂度&#xff0c;跟大家一起分享下。 时间复杂度O是表示算法运行时间与输入数据规模&#xff08;通常用 n 表示&#xff09;之间的关系。算法执行时间随输入数据规模增长的变化趋势。 1、O(1) — 常数时间 无论输入数据多大&#xff0c;执行时间固定不变…...

深度学习 从入门到精通 day_01

Pytorch安装 torch安装 python版本3.9.0 在官方文档里面找到适合你设备的PyTorch版本及对应的安装指令执行即可&#xff1a;https://pytorch.org/get-started/previous-versions/ 针对我的网络及设备情况&#xff0c;我复制了如下指令完成了Torch的安装&#xff1a; …...

AutoToM:让AI像人类一样“读心”的突破性方法

引言&#xff1a;AI如何理解人类的“内心世界”&#xff1f; 如何让AI像人类一样理解他人的意图、情感和动机&#xff1f;这一问题的核心是心智理论&#xff08;Theory of Mind, ToM&#xff09;&#xff0c;即通过观察行为推断心理状态的能力。近日&#xff0c;约翰霍普金斯大…...

Java实现Redis

String类型 代码 package com.whop.changyuan2.redisTest;import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.cor…...

DAY09:【pytorch】nn网络层

1、卷积层 1.1 Convolution 1.1.1 卷积操作 卷积运算&#xff1a;卷积核在输入信号&#xff08;图像&#xff09;上滑动&#xff0c;相应位置上进行乘加卷积核&#xff1a;又称为滤波器、过滤器&#xff0c;可认为是某种模式、某种特征 1.1.2 卷积维度 一般情况下&#xf…...

河南普瑞维升企业案例:日事清SOP流程与目标模块实现客户自主简报功能落地

公司简介&#xff1a; 河南普瑞维升企业管理咨询有限公司成立于2017年&#xff0c;目前公司主营业务是为加油站提供全方面咨询管理服务&#xff0c;目前公司成功运营打造河南成品油&#xff0c;运营站点15座&#xff0c;会员数量已达几十万&#xff0c;在加油站周边辐射区域内…...

LeetCode面试热题150中19-22题学习笔记(用Java语言描述)

Day 04 19、最后一个单词的长度 需求&#xff1a;给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 代码表示 public class Q19_1 {p…...

车载刷写架构 --- 刷写流程中重复擦除同一地址的问题分析

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 周末洗了一个澡,换了一身衣服,出了门却不知道去哪儿,不知道去找谁,漫无目的走着,大概这就是成年人最深的孤独吧! 旧人不知我近况,新人不知我过…...

一个测试GPU可用的测试实例

一个测试GPU可用的测试实例&#xff1a; import torch import torch.nn as nn import torch.optim as optim import time import gc import numpy as np from torch.cuda.amp import autocast, GradScalerclass LargeNN(nn.Module):def __init__(self, use_attentionTrue):sup…...

chili3d调试笔记2+添加web ui按钮

onclick 查找 打个断点看看 挺可疑的&#xff0c;打个断点看看 挺可疑的&#xff0c;打个断点看看 打到事件监听上了 加ui了 加入成功 新建弹窗-------------------------------------- 可以模仿这个文件&#xff0c;写弹窗 然后在这里注册一下&#xff0c;外部就能调用了 对了…...

Go-zero:JWT鉴权方式

1.简述 用于记录在go-zero的后端项目中如何添加jwt中间件鉴权 2.流程 配置api.yaml Auth:AccessSecret: "secret_key"AccessExpire: 604800config中添加Auth结构体 Auth struct {AccessSecret stringAccessExpire int64 }types定义jwt token的自定义数据结构&#…...

MySQL的MVCC机制详解

1. 什么是MVCC&#xff1f; MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并发控制&#xff09;是数据库系统中用于实现并发控制的一种技术。它通过保存数据在某个时间点的快照来实现&#xff0c;使得在同一个数据行上可以同时存在多个版本&#xff0…...

Postman做自动化测试

Postman也可以实现接口自动化 1.在Scripts写断言&#xff0c;图中红框处。不会写可以偷懒使用蓝框处会自动填写 2.单个运行调试&#xff0c;结果显示在TestResults 3.多个接口都写好断言并调通后&#xff0c;在包揽这些接口的文件夹下运行&#xff0c;图示以两个接口为例&…...

Meltdown原理介绍:用户空间读取内核内存

摘要 计算机系统的安全性从根本上依赖内存隔离,如,内核地址范围被标记为不可访问并受到保护,以防用户非法访问。本文介绍了Meltdown。 利用现代处理器上乱序执行,来读取内核任意的内存位置,包括个人数据和密码。乱序执行是必不可少的用来提升性能的手段,并在现代处理器中…...