当前位置: 首页 > news >正文

【从0到1学RabbitMQ】RabbitMQ高级篇

学完基础篇之后我们对用户下单这个业务进行了改造,我们可以吧用户支付这个业务抽出来,放入队列当中去执行。如下图:
在这里插入图片描述

但是这里我们思考一下,如果MQ通知失败了,支付服务中支付流水显示支付成功,而交易服务中的订单状态却显示未支付,数据出现了不一致。此时前端发送请求查询支付状态时,肯定是查询交易服务状态,会发现业务订单未支付,而用户自己知道已经支付成功,这就导致用户体验不一致。
因此,这里我们必须尽可能确保MQ消息的可靠性,即:消息应该至少被消费者处理1次
那么问题来了:

  • 我们该如何确保MQ消息的可靠性?
  • 如果真的发送失败,有没有其它的兜底方案?

发送者的可靠性

首先,我们一起分析一下消息丢失的可能性有哪些。
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
在这里插入图片描述

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

综上所述,我们要解决消息丢失的问题,保证MQ消息的可靠度,可以从下面三个方面入手

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

生产者重试机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认机制

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
    针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
    在这里插入图片描述
    总结如下:
  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

实现生产者确认

开启生产者确认
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执
定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
我们在发送者中定义

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
在这里插入图片描述
这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
在这里插入图片描述

@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

最后我们想一个交换机当中发送消息,然后指定一个错误的key
在这里插入图片描述
可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。
当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
而如果连交换机都是错误的,则只会收到nack。

我们收到ack是不需要重新发送的,因为我们的消息已经发到交换机当中,但是是nack我们需要从新发送,因为有可能是网络的问题,也有可能是没有改交换机。

MQ的可靠性

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
交换机持久化

在这里插入图片描述

队列持久化

在这里插入图片描述

消息持久化

在控制台发送消息的时候,可以设置
在这里插入图片描述
SpringAmQp发送消息的时候默认是持久化的,因为如果不设置消息持久化,Mq默认会把消息放到内存当中,这样就会导致一旦Mq宕机,消息就会全部丢失,内存空间有限,当消费者故障或者处理非常慢的到时候,消息就会积压,印发MQ阻塞。如果积压过多Mq就会把消息写入磁盘当中。
在这里插入图片描述
这里我们自定义发送的消息,改编为内存存储的方式
在这里插入图片描述
在这里插入图片描述
当我们一次性发送一千万消息的时候,消息会先存储到内存当中,但是内存满了之后,会把消息存储到磁盘当中,但是这时候消息的处理量会一下降低到0,写入磁盘完了以后,消息处理的能力会有到达峰值,我们可以看到,它的消息处理能力处于波动状态。

我们改为持久化模式看看
在这里插入图片描述
我们看到消息的处理能力一直处于一个高水平状态,并且消息刚被接受就写入到内存当中

LazyQueue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储
控制台配置Lazy模式

在这里插入图片描述

低代码方式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}

当然,我们也可以基于注解来声明队列并设置为Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生以上情况,Mq必须知道消费者的状态,然后重新投递给消费者,那么该如何知道呢?

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack;
    • 如果是消息处理或校验异常,自动返回reject;
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring 消费者接收到消息:【" + msg + "】");if (true) {throw new MessageConversionException("故意的");}log.info("消息处理完成");
}

通过以上测试,我们会发现,当消息投递到消费者之后,Mq不管业务是否处理成功,直接把消息删除。当我们改为auto之后

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):
在这里插入图片描述
由于发生的事消息转换的异常,所以直接返回了reject,消息被删除
在这里插入图片描述
当我们吧异常改为运行时异常

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring 消费者接收到消息:【" + msg + "】");if (true) {throw new RuntimeException("故意的");}log.info("消息处理完成");
}

还是在异常的地方打上断点,运行我们会发现,此时消息处于未确认状态
在这里插入图片描述
异常放行之后,我们会看到消息处于准备状态,然后重新投递给消费者

在这里插入图片描述

失败重试机制

当我们把类型改为auto的时候,如果在运行的过程中产生了业务的一些运行时异常,那么消息会不断地被投递给消费者进行重试,这样mq和我们项目的压力会飙升,最终可能会导致宕机。
当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

失败处理策略

在上面的失败重试机制当中,重试三次仍然失败以后,会返回reject然后消息就会被删除,在对消息可靠性较高的业务当中,这显然是不合适的。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

业务的幂等性

何为幂等性?
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断
唯一消息ID

这个思路非常简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQuque(Message message) {log.info("消费者收到了消息{}" , new String(message.getBody()));log.info("获取消费者Id{}" , message.getMessageProperties().getMessageId());
}

缺点,对业务具有侵入性,并且有对数据库的操作,影响业务性能

业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

@Override
public void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();
}

兜底方案

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
有没有其它兜底方案,能够确保订单的支付状态一致呢?
其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
流程如下:
在这里插入图片描述
不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。
那么问题来了,我们到底该在什么时间主动查询支付状态呢?

这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。
定时任务大家之前学习过,具体的实现这里就不再赘述了。

至此,消息可靠性的问题已经解决了。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

这一章我们就一起研究下这两种方案的实现方式,以及优缺点。

死信交换机和延迟消息

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息
延时消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,大家设想一下这样的场景:
如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
在这里插入图片描述
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒:
在这里插入图片描述
消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:
在这里插入图片描述
死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue:
在这里插入图片描述
由于direct.queue1与hmall.direct绑定的key是blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:
在这里插入图片描述
也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息。

在这里插入图片描述在这里插入图片描述

DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。

官方文档说明

下载安装
docker volume inspect mq-plugins
[{"CreatedAt": "2024-06-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明延迟交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
```
```java```

相关文章:

【从0到1学RabbitMQ】RabbitMQ高级篇

学完基础篇之后我们对用户下单这个业务进行了改造&#xff0c;我们可以吧用户支付这个业务抽出来&#xff0c;放入队列当中去执行。如下图&#xff1a; 但是这里我们思考一下&#xff0c;如果MQ通知失败了&#xff0c;支付服务中支付流水显示支付成功&#xff0c;而交易服务中…...

200 smart pid

PID整定控制面板-S7-200 SMART 跟我学/跟我做之PID功能-系列课程-西门子1847工业学习平台官网 使用西门子200SMART进行PID调节 PID自整定 PID调节技巧_哔哩哔哩_bilibili S7-200 SMART PID PID常见问题...

AI制作PPT,如何轻松打造高效演示文稿

AI制作PPT&#xff0c;如何轻松打造高效演示文稿&#xff01;随着信息化时代的到来&#xff0c;PPT已经成为了几乎所有职场人士、学生、讲师的必备工具。每个人都希望自己的PPT既有创意&#xff0c;又能高效展示信息。而在如今的科技背景下&#xff0c;AI的出现彻底改变了PPT的…...

如何用postman做接口自动化测试?

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 本文适合已经掌握 Postman 基本用法的读者&#xff0c;即对接口相关概念有一定了解、已经会使用 Postman 进行模拟请求等基本操作。 工作环境与版本&#xff1…...

day29-贪心__134. 加油站__135. 分发糖果__860.柠檬水找零__406.根据身高重建队列

134. 加油站 这道题的贪心方法相当的巧妙。 首先&#xff0c;我们可以通过gas[i] - cost[i]得到第i个站点的净加油量(耗油量)&#xff0c;那么如果我们现在考虑一个从某点a到某点b&#xff0c;那么如果a-》b范围之间的gas[i] - cost[i]存在负数&#xff0c;那么说明无法从a作…...

聊透多线程编程-线程基础-4.C# Thread 子线程执行完成后通知主线程执行特定动作

在多线程编程中&#xff0c;线程之间的同步和通信是一个常见的需求。例如&#xff0c;我们可能需要一个子线程完成某些任务后通知主线程&#xff0c;并由主线程执行特定的动作。本文将基于一个示例程序&#xff0c;详细讲解如何使用 AutoResetEvent 来实现这种场景。 示例代码…...

C# 组件的使用方法

类 Stopwatch 计算时间 Stopwatch sw new Stopwatch(); sw.Start(); // 要执行的代码块 Thread.Sleep(2000);sw.ElapsedMilliseconds // 消耗时间 Console.WriteLine(sw.ElapsedMilliseconds);组件 ListView 属性设置 外观 - View - Details 行为 - Columns -&#xff08;…...

Python常用排序算法

1. 冒泡排序 冒泡排序是一种简单的排序算法&#xff0c;它重复地遍历要排序的列表&#xff0c;比较相邻的元素&#xff0c;如果他们的顺序错误就交换他们。 def bubble_sort(arr):# 遍历所有数组元素for i in range(len(arr)):# 最后i个元素是已经排序好的for j in range(0, …...

HTML5 服务器发送事件(Server-Sent Events)

1. 引言 HTML5 服务器发送事件&#xff08;Server-Sent Events&#xff0c;SSE&#xff09;是一种基于 HTTP 的服务器推送技术&#xff0c;允许服务器主动向客户端&#xff08;如浏览器&#xff09;发送实时更新。SSE 适用于单向通信场景&#xff0c;如新闻推送、实时价格更新…...

【C++游戏引擎开发】第12篇:GLSL语法与基础渲染——从管线结构到动态着色器

一、OpenGL渲染管线解密 1.1 OpenGL渲染管线流程图 #mermaid-svg-GrAgLUat95CVZKm0 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GrAgLUat95CVZKm0 .error-icon{fill:#552222;}#mermaid-svg-GrAgLUat95CVZKm0 .e…...

阿里云负载均衡可以抗ddos吗

本文深度解析阿里云负载均衡的DDoS防护机制&#xff0c;通过实测数据验证其基础防御能力边界&#xff0c;揭示需结合云盾高防IP实现TB级流量清洗的工程实践。结合2023年Memcached反射攻击事件&#xff0c;提供混合云架构下的多层级防御方案设计指南。 云原生负载均衡的基础防护…...

动手学习:路径规划原理及常用算法

一、路径规划的基本原理 路径规划&#xff08;Path Planning&#xff09;是机器人导航的核心任务&#xff0c;目标是为机器人找到一条从起点到终点的无碰撞路径&#xff0c;同时满足约束条件&#xff08;如最短路径、最优能耗、安全性等&#xff09;。在人形机器人场景中&…...

Web前端性能指标Web3D性能优化

性能指标&评估方式 在Web3D性能优化之前,先了解性能指标&评估方式 前端性能指标评估与监测工具可分为以下几类,结合不同场景和需求,开发者可选择适合的工具进行性能优化: 一、浏览器内置工具 Chrome DevTools Performance 面板:记录运行时性能,分析CPU、内存使…...

Mujoco xml <option>

xml option option总起例子timestep(一般会用到)gravity(一般会用到)windmagneticdensityviscosityo_margino_solref, o_solimpo_frictionintegrator(一般会用到)cone(一般会用到)jacobian(一般会用到)solver(一般会用到)iterations(一般会用到)tolerance(一般会用到)noslip_it…...

如何用 nvm alias default 18.20.8 实现全局 Node.js 版本管理?一篇保姆级指南!!!

&#x1f4dd; 如何用 nvm alias default 18.20.8 实现全局 Node.js 版本管理&#xff1f;一篇保姆级指南 &#x1f680; 1. 核心命令解析 &#x1f50d; nvm alias default 18.20.8 是 nvm 管理工具中用于设置全局默认 Node.js 版本的核心命令。它的作用是将指定版本锁定为所…...

推荐一款Nginx图形化管理工具: NginxWebUI

Nginx Web UI是一款专为Nginx设计的图形化管理工具&#xff0c;旨在简化Nginx的配置与管理过程&#xff0c;提高开发者和系统管理的工作效率。项目地址&#xff1a;https://github.com/cym1102/nginxWebUI 。 一、Nginx WebUI的主要特点 简化配置&#xff1a;通过图形化的界…...

Pytest多环境切换实战:测试框架配置的最佳实践!

你是否也遇到过这种情况&#xff1a;本地测试通过&#xff0c;一到测试环境就翻车&#xff1f;环境变量错乱、接口地址混乱、数据源配置丢失……这些「环境切换」问题简直像定时炸弹&#xff0c;随时引爆你的测试流程&#xff01; 测试人员每天都跟不同的环境打交道&#xff0…...

大模型在网络安全领域的七大应用

1. 高级威胁检测与防御自动化 技术路径&#xff1a; 数据整合&#xff1a;聚合网络流量、终端日志、威胁情报等多源数据&#xff0c;构建多维特征库。行为建模&#xff1a;通过大模型的上下文理解能力&#xff0c;建立正常行为基线&#xff0c;识别偏离模式。动态策略生成&am…...

SpringBoot项目部署之启动脚本

一、启动脚本方案 1. 基础启动方式 1.1 直接运行JAR java -jar your-app.jar --spring.profiles.activeprod优点&#xff1a;简单直接&#xff0c;适合快速测试缺点&#xff1a;终端关闭即终止进程 1.2 后台运行 nohup java -jar your-app.jar > app.log 2>&1 &…...

【spark-submit】--提交任务

Spark-submit spark-submit 是 Apache Spark 提供的用于提交 Spark 应用程序到集群的命令行工具。 基本语法 spark-submit [options] <app-jar> [app-arguments]常用参数说明 应用程序配置 --class <class-name>: 指定应用程序的主类&#xff08;对于 Java/Sc…...

机器学习中的回归与分类模型:线性回归、逻辑回归与多分类

在机器学习领域&#xff0c;回归和分类是两类重要的任务&#xff0c;它们各自有着不同的应用场景和模型构建方式。本文将详细介绍线性回归、逻辑回归以及多分类任务的相关内容&#xff0c;包括数据预处理、模型定义、损失函数的选择以及评估指标的计算。 一、线性回归&#xf…...

spark-rdd

Spark-core RDD转换算子 RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型。 Value类型&#xff1a; 1.map 将处理的数据逐条进行映射转换&#xff0c;这里的转换可以是类型的转换&#xff0c;也可以是值的转换 mapPartitions map …...

Python 实现如何电商网站滚动翻页爬取

一、电商网站滚动翻页机制分析 电商网站如亚马逊和淘宝为了提升用户体验&#xff0c;通常采用滚动翻页加载数据的方式。当用户滚动页面到底部时&#xff0c;会触发新的数据加载&#xff0c;而不是一次性将所有数据展示在页面上。这种机制虽然对用户友好&#xff0c;但对爬虫来…...

pytorch TensorDataset与DataLoader类

读取数据 Dataset类 Dataset 是一个读取数据抽象类&#xff0c;所有自定义的数据集类需要继承该类。 该类主要实现以下三个功能 ①如何获取每一个数据及其label --> 抽象方法__getitem()__设置通过对象[索引]的方式获取每一个样本及其label ②告知一共有多少数据 -->…...

AI大模型与知识生态:重构认知的新时代引擎

📝个人主页🌹:慌ZHANG-CSDN博客 🌹🌹期待您的关注 🌹🌹 一、引言:我们如何获得知识,正在被AI彻底改写 从古代图书馆、百科全书,到搜索引擎、问答社区,人类获取知识的方式一直在进化。而随着 ChatGPT、DeepSeek、Grok 等 AI 大模型的到来,这一过程迎来了颠覆…...

Server-Sent Events一种允许服务器向客户端发送实时更新的 Web API

Server-Sent Events&#xff08;SSE&#xff09;是一种允许服务器向客户端发送实时更新的 Web API。它基于 HTTP 协议&#xff0c;提供了一种单向的、服务器到客户端的通信机制&#xff0c;客户端可以通过监听服务器发送的事件来接收实时数据。下面从原理、使用场景、代码示例等…...

电子电器架构 --- AI如何重构汽车产业

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 周末洗了一个澡&#xff0c;换了一身衣服&#xff0c;出了门却不知道去哪儿&#xff0c;不知道去找谁&am…...

操作系统CPU调度

简介 当CPU有大量任务要处理,但由于资源有限,无法同时处理。所有就需要某种规则来决定任务处理的顺序,这就是调度。 调度层次 根据调度频率与层次,共分为三种 高级调度 也称为作业调度(Long-Trem Scheduling),频次很低,它决定哪些进程从外存(硬盘)加载到内存中级调度 也…...

icoding题解排序

数组合并 假设有 n 个长度为 k 的已排好序&#xff08;升序&#xff09;的数组&#xff0c;请设计数据结构和算法&#xff0c;将这 n 个数组合并到一个数组&#xff0c;且各元素按升序排列。即实现函数&#xff1a; void merge_arrays(const int* arr, int n, int k, int* out…...

xHCI 上 USB 读写分析

系列文章目录 xHCI 简单分析 USB Root Hub 分析 USB Hub 检测设备 usb host 驱动之 urb xHCI那些事儿 PCIe MMIO、DMA、TLP PCIe配置空间与CPU访问机制 PCIe总线协议基础实战 文章目录 系列文章目录一、xHCI 初始化二、xHCI 驱动识别根集线器&#xff08;RootHub&#xff09;…...

SpringCloud Alibaba 之分布式全局事务 Seata 原理分析

1. 什么是 Seata&#xff1f;为什么需要它&#xff1f; 想象一下&#xff0c;你去银行转账&#xff1a; 操作1&#xff1a;从你的账户扣款 1000 元操作2&#xff1a;向对方账户增加 1000 元 如果 操作1 成功&#xff0c;但 操作2 失败了&#xff0c;你的钱就凭空消失了&…...

《C语言中的“魔法盒子”:自定义函数的奇妙之旅》

&#x1f680;个人主页&#xff1a;BabyZZの秘密日记 &#x1f4d6;收入专栏&#xff1a;C语言 &#x1f30d;文章目入 一、引言二、自定义函数的创建&#xff08;一&#xff09;基本结构&#xff08;二&#xff09;一个简单的例子 三、自定义函数的使用&#xff08;一&#xf…...

【Spring】IoC 和 DI的关系、简单使用,从“硬编码“到“优雅解耦“:IoC与DI的Spring蜕变之旅

1.IoC 和 DI的关系 IoC&#xff08;Inversion of Control&#xff0c;控制反转&#xff09;和DI&#xff08;Dependency Injection&#xff0c;依赖注入&#xff09;是Spring框架中紧密相关但又有所区别的两个概念。理解它们的联系&#xff0c;可以帮助我们更深刻地掌握Spring…...

43、RESTful API 保姆教程

RESTful API 目录 RESTful API简介RESTful设计原则RESTful设计规范RESTful统一返回体JAX-RSJAX-RS与SpringBoot集成构建Restful服务实践总结一、RESTful API 简介 REST(Representational State Transfer)是一种基于HTTP的web服务架构风格,RESTful API则是遵循REST原则的网…...

ASP.NET Core 性能优化:客户端响应缓存

文章目录 前言一、什么是缓存二、客户端缓存核心机制&#xff1a;HTTP缓存头1&#xff09;使用[ResponseCache]属性&#xff08;推荐&#xff09;2&#xff09;预定义缓存配置&#xff08;CacheProfile&#xff09;3&#xff09;手动设置HTTP头4&#xff09;缓存验证机制&#…...

算法导论(递归回溯)——递归

算法思路&#xff08;21&#xff09; 递归函数的含义&#xff1a; 创建一个递归函数&#xff0c;该函数接受两个链表的头结点作为输入&#xff0c;返回合并后的链表的头结点。 选择较小的节点&#xff1a; 在函数体内&#xff0c;首先比较两个链表的头结点的值&#xff0c;选择…...

从接口400ms到20ms,记录一次JVM、MySQL、Redis的混合双打

​​1. 场景&#xff1a;促销活动的崩溃​​ 接到报警短信&#xff0c;核心接口响应时间突破​​5秒​​&#xff0c;DB CPU飙到100%。 用Arthas抓取线上火焰图后发现&#xff1a; ---[ 4763ms ] com.example.service.OrderService.createOrder() |---[ 98% ] com.example.m…...

Excel通过VBA脚本去除重复数据行并保存

一、方法1:使用字典动态去重并保存 适用场景&#xff1a;需要灵活控制去重逻辑&#xff08;如保留最后一次出现的重复项&#xff09;时 Sub 动态去重保存到新表()Dim srcSheet As Worksheet, destSheet As WorksheetDim dict As Object, lastRow As Long, i As LongDim key A…...

Mysql表的操作(2)

1.去重 select distinct 列名 from 表名 2.查询时排序 select 列名 from 表名 order by 列名 asc/desc; 不影响数据库里面的数据 错误样例 &#xff1a; 但结果却有点出乎意料了~为什么会失败呢&#xff1f; 其实这是因为书写的形式不对&#xff0c;如果带了引号&#xff0c;…...

#Linux内存管理# 在ARM32系统中,页表是如何映射的?在ARM64系统中,页表又是如何映射的?

一、ARM32系统页表映射 1. 层级结构与地址划分 默认实现&#xff1a;采用两层映射&#xff08;PGD→PTE&#xff09;&#xff0c;合并Linux标准三级模型中的PMD层。 虚拟地址解析&#xff08;以4KB页为例&#xff09;&#xff1a; Bits[31:20]&#xff1a;一级页表&#xff08;…...

prometheus整合jmx_exporter 使用jmx_exporter监控Kafka

docker-compose部署kafka集群&#xff1b;单节点单zk-CSDN博客 springboot整合kafka&#xff1b;docker部署kafka-CSDN博客 kafka使用SSL加密和认证--todo_ssl.truststore.location-CSDN博客 version: 3.8services:zookeeper1:image: zookeeper:3.9.1container_name: zook…...

深度学习实战:从零构建图像分类API(Flask/FastAPI版)

引言&#xff1a;AI时代的图像分类需求 在智能时代&#xff0c;图像分类技术已渗透到医疗影像分析、自动驾驶、工业质检等各个领域。作为开发者&#xff0c;掌握如何将深度学习模型封装为API服务&#xff0c;是实现技术落地的关键一步。本文将手把手教你使用Python生态中的Fla…...

Kafka实现延迟消息

Kafka 实现延迟消息 Kafka 本身不支持原生的延迟消息&#xff08;不像 RocketMQ 内置了延迟队列&#xff09;&#xff0c;但可以通过多种方式来实现延迟消息。常见的方案如下&#xff1a; 1. 使用不同的 Topic 分区&#xff08;最常见&#xff09; 思路&#xff1a; 创建多…...

大数据(7.2)Kafka万亿级数据洪流下的架构优化实战:从参数调优到集群治理

目录 一、海量数据场景下的性能之殇1.1 互联网企业的数据增长曲线1.2 典型性能瓶颈分析 二、生产者端极致优化2.1 批量发送黄金法则2.1.1 分区选择算法对比 2.2 序列化性能突破 三、消费者端并发艺术3.1 多线程消费模式演进3.1.1 消费组Rebalance优化 3.2 位移管理高阶技巧 四、…...

要查看 FAISS 使用的 OpenMP 版本,需根据安装方式和系统环境采用不同方法。以下是具体步骤和原理分析:

要查看 FAISS 使用的 OpenMP 版本&#xff0c;需根据安装方式和系统环境采用不同方法。以下是具体步骤和原理分析&#xff1a; 方法 1&#xff1a;通过库文件名称直接查看&#xff08;推荐&#xff09; FAISS 的 OpenMP 版本通常直接体现在其依赖的动态链接库&#xff08;DLL/…...

AI 大模型的标准化工具箱MCP (Model Context Protocol)

MCP简介 MCP &#xff08;Model Context Protocol&#xff0c;模型上下文协议&#xff09;定义了应用程序和 AI 模型之间交换上下文信息的方式。这使得开发者能够以一致的方式将各种数据源、工具和功能连接到 AI 模型&#xff08;一个中间协议层&#xff09;&#xff0c;就像 …...

哈希表的封装

目录 引入 哈希表封装 修改哈希表参数 修改哈希表成员 修改%时使用的变量 修改读取时获得的变量 迭代器的实现 迭代器的定义 迭代器 迭代器*解引用 迭代器->成员访问 迭代器重载和! 封装迭代器 HashTable迭代器封装 非const版本 const版本 unordered_set迭…...

2025年认证杯数模竞赛赛题浅析-快速选题

赛题浅析 认证杯作为国内最早的数学建模论坛、唯一一个全部公开参赛论文的竞赛、国内最大的数学建模竞赛之一、唯一一个对非学生群里开放的数学建模竞赛、国内唯二的支持高中生参赛的大学生数模竞赛。在数模界一直被视为国赛之前较好的练手赛&#xff0c;本文将初步简略得介绍…...

【网络安全】Linux 常见命令

未经许可,不得转载。 文章目录 正文系统信息查看用户与权限管理进程管理网络配置与检测文件操作日志查看与分析权限审计与安全检测正文 在网络安全工作中,熟练掌握 Linux 系统中的常用命令,对于日常运维、日志分析、安全排查等工作至关重要。 以下为常用命令汇总,供参考。…...

电脑卡顿严重怎么办 电脑卡顿的处理指南

电脑突然卡顿比较严重&#xff0c;这是很多用户都曾经遇到过的问题&#xff0c;鼠标一直转圈圈&#xff0c;无法进行任何操作。电脑卡顿&#xff0c;电脑卡顿不仅会降低工作效率&#xff0c;还可能导致数据丢失&#xff0c;数据无法保存。很多用户解决电脑卡顿的方法就是直接一…...