【RabbitMQ】 RabbitMQ高级特性(二)
文章目录
- 一、重试机制
- 1.1、重试配置
- 1.2、配置交换机&队列
- 1.3、发送消息
- 1.4、消费消息
- 1.5、运行程序
- 1.6、 手动确认
- 二、TTL
- 2.1、设置消息的TTL
- 2.2、设置队列的TTL
- 2.3、两者区别
- 三 、死信队列
- 6.1 死信的概念
- 3.2 代码示例
- 3.2.1、声明队列和交换机
- 3.2.2、正常队列绑定死信交换机
- 3.2.3 制造死信产生的条件
- 3.2.4、发送消息
- 3.2.5、测试死信
- 3.3、常见面试题
- 四、延迟队列
- 4.1、概念
- 4.2、应用场景
- 4.3、TTL+死信队列实现
- 4.4、常见面试题
- 五、事务
- 5.1、配置事务管理器
- 5.2、声明队列
- 5.3、生产者
- 结语
本文延续上文RabbitMQ高级特性(一)为大家继续讲解RabbitMQ其他高级特性
一、重试机制
在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送. 但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数
1.1、重试配置
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/jiaohuanlistener:simple:acknowledge-mode: auto #消息接收确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时⻓为5秒max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)
1.2、配置交换机&队列
//重试机制public static final String RETRY_QUEUE = "retry_queue";public static final String RETRY_EXCHANGE_NAME = "retry_exchange";//重试机制 发布订阅模式//1. 交换机@Bean("retryExchange")public Exchange retryExchange() {return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryExchange") FanoutExchangeexchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
1.3、发送消息
@RequestMapping("/retry")
public String retry(){rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry test...");return "发送成功!"; }
1.4、消费消息
@Component
public class RetryQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.RETRY_QUEUE)public void ListenerQueue(Message message) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3 / 0;System.out.println("处理完成");}
}
1.5、运行程序
我们可以观察到结果
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'consumer ack test...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ack_exchange, receivedRoutingKey=ack, deliveryTag=1, consumerTag=amq.ctag-vYckQBt9_0-5v2oG9oBnFw, consumerQueue=ack_queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void com.jiaohuan.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'threw exception
如果对异常进行捕获, 那么就不会进行重试 代码修改如下:
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
//模拟处理失败
try {int num = 3/0;System.out.println("处理完成");
}catch (Exception e){System.out.println("处理失败");
}
重新运行程序, 结果如下:
接收到消息: consumer ack test..., deliveryTag: 1
处理失败
1.6、 手动确认
改为手动确认
@RabbitListener(queues = Constant.RETRY_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3 / 0;System.out.println("处理完成");//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收Thread.sleep(1000);//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃channel.basicNack(deliveryTag, true, true);}}
运⾏结果:
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 2
接收到消息: retry test..., deliveryTag: 3
接收到消息: retry test..., deliveryTag: 4
接收到消息: retry test..., deliveryTag: 5
接收到消息: retry test..., deliveryTag: 6
接收到消息: retry test..., deliveryTag: 7
接收到消息: retry test..., deliveryTag: 8
接收到消息: retry test..., deliveryTag: 9
接收到消息: retry test..., deliveryTag: 10
接收到消息: retry test..., deliveryTag: 11
可以看到, 手动确认模式时, 重试次数的限制不会像在自动确认模式下那样直接生效, 因为是否重试以及何时重试更多地取决于应⽤程序的逻辑和消费者的实现. ⾃动确认模式下, RabbitMQ 会在消息被投递给消费者后自动确认消息. 如果消费者处理消息时抛出异 常, RabbitMQ 根据配置的重试参数自动将消息重新⼊队, 从而实现重试. 重试次数和重试间隔等参数可以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执行这些重试策略.
⼿动确认模式下, 消费者需要显式地对消息进行确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新⼊队. 重试的控制权在于应用程序本身, 而不是RabbitMQ的内部机制. 应用程序 可以通过自己的逻辑和利用RabbitMQ的⾼级特性来实现有效的重试策略。
使⽤重试机制时需要注意:
1 . ⾃动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失 了
2 . ⼿动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是unacked的状态, 导致消息积压
二、TTL
TTL(Time to Live, 过期时间), 即过期时间. RabbitMQ可以对消息和队列设置TTL.当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除。
咱们在⽹上购物, 经常会遇到⼀个场景, 当下单超过24⼩时还未付款, 订单会被⾃动取消 还有类似的, 申请退款之后, 超过7天未被处理, 则⾃动退款
2.1、设置消息的TTL
目前有两种方法可以设置消息的TTL.
⼀是设置队列的TTL, 队列中所有消息都有相同的过期时间. ⼆是对消息本身进行单独设置, 每条消息的TTL可以不同. 如果两种方法⼀起使用, 则消息的TTL以两者之间较小的那个数值为准. 先看针对每条消息设置TTL。针对每条消息设置TTL的方法是在发送消息的方法中加入expiration的属性参数,单位为毫秒.
配置交换机&队列:
//TTLpublic static final String TTL_QUEUE = "ttl_queue";public static final String TTL_EXCHANGE_NAME = "ttl_exchange";//ttl//1. 交换机@Bean("ttlExchange")public Exchange ttlExchange() {return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constant.TTL_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange,@Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
发送消息:
@RequestMapping("/ttl")public String ttl() {String ttlTime = "10000";//10srabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration(ttlTime);return messagePostProcessor;});return"发送成功!";}
观看结果:
发送消息后, 可以看到, Ready消息为1:
10秒钟之后, 刷新页面, 发现消息已被删除:
如果不设置TTL,则表⽰此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到 消费者,否则该消息会被立即丢弃
2.2、设置队列的TTL
设置队列TTL的方法是在创建队列时, 加⼊ x-message-ttl 参数实现的, 单位是毫秒。
配置队列和绑定关系:
public static final String TTL_QUEUE2 = "ttl_queue2";//设置ttl@Bean("ttlQueue2")public Queue ttlQueue2() {//设置20秒过期return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();}//3. 队列和交换机绑定 Binding@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange,@Qualifier("ttlQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
设置过期时间, 也可以采⽤以下方式:
@Bean("ttlQueue2")public Queue ttlQueue2() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl", 20000);//20秒过期return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(arguments).build();}
发送消息:
@RequestMapping("/ttl")public String ttl() {// String ttlTime = "30000";//10s// //发送带ttl的消息// rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {// messagePostProcessor.getMessageProperties().setExpiration(ttlTime);// return messagePostProcessor;//});//发送不带ttl的消息rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...");return "发送成功!";}
运行程序:
运行之后发现,新增了⼀个队列, 队列Features有⼀个TTL标识:
调用接口, 发送消息:
发送消息后, 可以看到, Ready消息为1:
采⽤发布订阅模式, 所有与该交换机绑定的队列(ttl_queue和ttl_queue2)都会收到消息
20秒钟之后, 刷新页面, 发现消息已被删除
由于ttl_queue队列, 未设置过期时间, 所以ttl_queue的消息未删除。
2.3、两者区别
设置队列TTL属性的方法, ⼀旦消息过期, 就会从队列中删除 设置消息TTL的方法, 即使消息过期, 也不会马上从队列中删除, 而是在即将投递到消费者之前进行判定的.
为什么这两种方法处理的方式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否 有过期的消息即可. ⽽设置消息TTL的方式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不 如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.
三 、死信队列
6.1 死信的概念
死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信. 有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器 中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(Dead Letter Queue,简称DLQ)
消息变成死信⼀般是由于以下几种情况:
- 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
- 消息过期.
- 队列达到最大长度
3.2 代码示例
3.2.1、声明队列和交换机
包含两部分:
• 声明正常的队列和正常的交换机
• 声明死信队列和死信交换机
死信交换机和死信队列和普通的交换机, 队列没有区别
//死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitmq.Constant;/*** 死信队列相关配置*/
@Configuration
public class DLXConfig {//死信交换机@Bean("dlxExchange")public Exchange dlxExchange() {returnExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();}//2. 死信队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DLX_QUEUE).build();}//3. 死信队列和交换机绑定 Binding@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,@Qualifier("dlxQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}//正常交换机@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();}//正常队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();}//正常队列和交换机绑定 Binding@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchangeexchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}
}
3.2.2、正常队列绑定死信交换机
当这个队列中存在死信时, RabbitMQ会自动的把这个消息重新发布到设置的DLX上, 进而被路由到另一个队列, 即死信队列.可以监听这个死信队列中的消息以进⾏相应的处理
@Bean("normalQueue")public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的RoutingKeyreturn QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();}
3.2.3 制造死信产生的条件
@Bean("normalQueue")public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的RoutingKey//制造死信产⽣的条件arguments.put("x-message-ttl", 10000);//10秒过期arguments.put("x-max-length", 10);//队列⻓度return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();}
3.2.4、发送消息
@RequestMapping("/dlx")public void dlx() {//测试过期时间, 当时间达到TTL, 消息⾃动进⼊到死信队列rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");//测试队列⻓度// for (int i = 0; i < 20; i++) {// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");// }//测试消息拒收// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");}
3.2.5、测试死信
程序启动之后, 观察队列:
队列Features说明:
D: durable的缩写, 设置持久化
TTL: Time to Live, 队列设置了TTL
Lim: 队列设置了长度(x-max-length)
DLX: 队列设置了死信交换机(x-dead-letter-exchange)
DLK: 队列设置了死信RoutingKey(x-dead-letter-routing-key)
- 测试过期时间, 到达过期时间之后, 进⼊死信队列
发送之后:
10秒后, 消息进入到死信队列:
生产者首先发送⼀条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中. 由于队列normal_queue设置了过期时间为10s, 在这10s内没有消费者消费这条消息, 那么判定这条消息过期. 由于设置了DLX, 过期之时, 消息会被丢给交换器(dlx_exchange)中, 这时根据RoutingKey匹配, 找到匹配的队列(dlx_queue), 最后消息被存储在queue.dlx这个死信队列中.
- 测试达到队列长度, 消息进入死信队列
队列⻓度设置为10, 我们发送20条数据, 会有10条数据直接进⼊到死信队列 发送前, 死信队列只有⼀条数据
发送20条消息:
//测试队列⻓度
for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
运⾏后, 可以看到死信队列变成了11条
过期之后, 正常队列的10条也会进入到死信队列
3.3、常见面试题
死信队列作为RabbitMQ的高级特性,也是面试的一大重点。
- 死信队列的概念 死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些无法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列用于存储这些死信消息
- 死信的来源
1)消息过期: 消息在队列中存活的时间超过了设定的TTL
2)消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新入队(requeue=false), 消息也会成为死信.
3)队列满了: 当队列达到最大长度, 无法再容纳新的消息时, 新来的消息会被处理为死信. - 死信队列的应用场景 对于RabbitMQ来说, 死信队列是⼀个非常有用的特性. 它可以处理异常情况下,消息不能够被消费者正 确消费而被置⼊死信队列中的情况, 应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的 异常情况, 进而可以改善和优化系统. 比如: 用户支付订单之后, 支付系统会给订单系统返回当前订单的⽀付状态。为了保证支付信息不丢失, 需要使用到死信队列机制. 当消息消费异常时, 将消息投入到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进行处理(比如发送工单等,进行人工确认).
场景的应用场景还有:
• 消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理.
• 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占⽤系统资源.
• ⽇志收集:将死信消息作为日志收集起来,用于后续分析和问题定位.
四、延迟队列
4.1、概念
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, 而是等待特定时间后, 消费者才能拿到这个消息进行消费.
4.2、应用场景
延迟队列的使用场景有很多, 比如:
- 智能家居: 用户希望通过手机远程遥控家⾥的智能设备在指定的时间进行⼯作. 这时候就可以将用户指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
- ⽇常管理: 预定会议后,需要在会议开始前十五分钟提醒参会⼈参加会议
- ⽤⼾注册成功后, 7天后发送短信, 提高用户活跃度等
- …
RabbitMQ本身没有直接支持延迟队列的的功能, 但是可以通过前面所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能. 假设⼀个应用中需要将每条消息都设置为10秒的延迟, 生产者通过 normal_exchange 这个交换器将 发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并非是 normal_queue 这个队列, 而是 dlx_queue 这个队列. 当消息从normal_queue 这个队列中过期之后被存入 dlx_queue 这个 队列中,消费者就恰巧消费到了延迟10秒的这条消息.
所以死信队列展⽰的也是延迟队列的使用.
4.3、TTL+死信队列实现
代码实现:
先看TTL+死信队列实现延迟队列
继续沿用死信队列的代码即可
声明队列
//正常队列@Bean("normalQueue")public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的RoutingKeyreturn QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();}
⽣产者:
发送两条消息, ⼀条消息10s后过期, 第二条20s后过期
@RequestMapping("/delay")public String delay() {//发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 10s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 20s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s过期return messagePostProcessor;});return "发送成功!";}
消费者:
//指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", newDate(),newString(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
}
运行程序:
通过控制台观察死信队列消费情况:
死信队列接收到消息: ttl test 10s…Wed May 22 11:58:50 CST , deliveryTag: 1
死信队列接收到消息: ttl test 20s…Wed May 22 11:58:50 CST , deliveryTag: 2
可以看到, 两条消息按照过期时间依次进入了死信队列. 延迟队列, 就是希望等待特定的时间之后, 消费者才能拿到这个消息. TTL刚好可以让消息延迟⼀段时间 成为死信, 成为死信的消息会被投递到死信队列⾥, 这样消费者⼀直消费死信队列里的消息就可以了.
存在问题
接下来把⽣产消息的顺序修改⼀下 先发送20s过期数据, 再发送10s过期数据
@RequestMapping("/delay")public String delay() {//发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 20s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 10s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s过期return messagePostProcessor;});return "发送成功!";}
这时会发现: 10s过期的消息,,也是在20s后才进入到死信队列.
消息过期之后, 不⼀定会被马上丢弃. 因为RabbitMQ只会检查队首消息是否过期, 如果过期则丢到死信队列. 此时就会造成⼀个问题, 如果第⼀个消息的延时时间很长, 第二个消息的延时时间很短, 那第二个 消息并不会优先得到执行.
所以在考虑使用TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是⼀致 的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每⼀种不同延迟时间的消息建⽴单独的消息队列。
另外注意,同样可以使用插件使得消息按照延迟时间到达消费者
4.4、常见面试题
延迟队列作为RabbitMQ的高级特性,也是面试的一大重点. 介绍下RabbitMQ的延迟队列。延迟队列是⼀个特殊的队列, 消息发送之后, 并不立即给消费者, 而是等待特定的时间, 才发送给消费者. 延迟队列的应用场景有很多, 比如:
- 订单在十分钟内未支付自动取消
- 用户注册成功后, 3天后发调查问卷
- 用户发起退款, 24小时后商家未处理, 则默认同意, 自动退款
- …
但RabbitMQ本身并没直接实现延迟队列, 通常有两种方法:
1 . TTL+死信队列组合的方式
2 . 使用官方提供的延迟插件实现延迟功能
⼆者对比:
- 基于死信实现的延迟队列
a. 优点: 1) 灵活不需要额外的插件支持
b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性 - 基于插件实现的延迟队列
a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
b. 缺点: 1) 需要依赖特定的插件, 有运维工作 2) 只适用特定版本
五、事务
RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也支持事务机制. SpringAMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原子性的, 要么全部成功, 要么全部失败.
5.1、配置事务管理器
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManagertransactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}
5.2、声明队列
@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable("trans_queue").build();}
5.3、生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/trans")
@RestController
public class TransactionProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactional@RequestMapping("/send")public String send() {rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");int a = 5 / 0;rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");return "发送成功";}
}
通过测试发现:
- 不加 @Transactional , 会发现消息1发送成功
- 添加 @Transactional , 消息1和消息2全部发送失败
结语
本篇文章主要介绍了RAbbitMQ中的部分高级特性,主要从重试机制、有效时间TTL、死信队列、延迟队列和事务几方面展开。以上就是本文全部内容,感谢各位能够看到最后,如有问题,欢迎各位大佬在评论区指正,希望大家可以有所收获!创作不易,希望大家多多支持!
最后,大家再见!祝好!我们下期见!
相关文章:
【RabbitMQ】 RabbitMQ高级特性(二)
文章目录 一、重试机制1.1、重试配置1.2、配置交换机&队列1.3、发送消息1.4、消费消息1.5、运行程序1.6、 手动确认 二、TTL2.1、设置消息的TTL2.2、设置队列的TTL2.3、两者区别 三 、死信队列6.1 死信的概念3.2 代码示例3.2.1、声明队列和交换机3.2.2、正常队列绑定死信交…...
EMQX开源版安装指南:Linux/Windows全攻略
EMQX开源版安装教程-linux/windows 因最近自己需要使用MQTT,需要搭建一个MQTT服务器,所以想到了很久以前用到的EMQX。但是当时的EMQX使用的是开源版的,在官网可以直接下载。而现在再次打开官网时发现怎么也找不大开源版本了,所以…...
MySQL 数据库备份与还原
作者:IvanCodes 日期:2025年5月18日 专栏:MySQL教程 思维导图 备份 (Backup) 与 冗余 (Redundancy) 的核心区别: 🎯 备份是指创建数据的副本并将其存储在不同位置或介质,主要目的是在发生数据丢失、损坏或逻辑错误时进…...
【数据结构】2-3-4 单链表的建立
数据结构知识点合集 尾插法建立单链表 建立链表时总是将新节点插入到链表的尾部,将新插入的节点作为链表的尾节点 /*尾插法建立链表L*/ LinkList List_TailInsert(LinkList &L) { int x; /*建立头节点*/ L (LNode *)malloc(sizeof(LNode)); /*…...
JVM如何处理多线程内存抢占问题
目录 1、堆内存结构 2、运行时数据 3、内存分配机制 3.1、堆内存结构 3.2、内存分配方式 1、指针碰撞 2、空闲列表 4、jvm内存抢占方案 4.1、TLAB 4.2、CAS 4.3、锁优化 4.4、逃逸分析与栈上分配 5、问题 5.1、内存分配竞争导致性能下降 5.2、伪共享(…...
猫番阅读APP:丰富资源,优质体验,满足你的阅读需求
猫番阅读APP是一款专为书籍爱好者设计的移动阅读应用,致力于提供丰富的阅读体验和多样化的书籍资源。它不仅涵盖了小说、非虚构、杂志等多个领域的电子书,还提供了个性化推荐、书架管理、离线下载等功能,满足不同读者的阅读需求。无论是通勤路…...
Redis 学习笔记 4:优惠券秒杀
Redis 学习笔记 4:优惠券秒杀 本文基于前文的黑马点评项目进行学习。 Redis 生成全局唯一ID 整个全局唯一 ID 的结构如下: 这里的时间戳是当前时间基于某一个基准时间(项目开始前的某个时间点)的时间戳。序列号是依赖 Redis 生…...
C++学习:六个月从基础到就业——C++17:if/switch初始化语句
C学习:六个月从基础到就业——C17:if/switch初始化语句 本文是我C学习之旅系列的第四十六篇技术文章,也是第三阶段"现代C特性"的第八篇,主要介绍C17引入的if和switch语句的初始化表达式特性。查看完整系列目录了解更多内…...
C++跨平台开发经验与解决方案
在当今软件开发领域,跨平台开发已成为一个重要的需求。C作为一种强大的系统级编程语言,在跨平台开发中扮演着重要角色。本文将分享在实际项目中的跨平台开发经验和解决方案。 1. 构建系统选择 CMake的优势 跨平台兼容性好 支持多种编译器和IDE 强大…...
RabbitMQ 工作模式(上)
前言 在 RabbitMQ 中,一共有七种工作模式,我们也可以打开官网了解: 本章我们先介绍前三种工作模式 (Simple)简单模式 P:producer 生产者,负责发送消息 C:consumer 消费者&#x…...
为什么需要加密机服务?
前言 大家好,我是老马。 以前我自己在写工具的时候,都是直接自己实现就完事了。 但是在大公司,或者说随着合规监管的要求,自己随手写的加解密之类的,严格说是不合规的。 作为一家技术性公司,特别是金融…...
【Linux】利用多路转接epoll机制、ET模式,基于Reactor设计模式实现
📚 博主的专栏 🐧 Linux | 🖥️ C | 📊 数据结构 | 💡C 算法 | 🅒 C 语言 | 🌐 计算机网络 上篇文章:多路转接epoll,实现echoserver 至此,Linux与…...
c/c++的findcontours崩溃解决方案
解决 Windows 平台 OpenCV findContours 崩溃:一种更稳定的方法 许多在 Windows 平台上使用 OpenCV 的开发者可能会在使用 findContours 函数时,遇到令人头疼的程序崩溃问题。尽管网络上流传着多种解决方案,但它们并非总能根治此问题。 当时…...
机器学习 Day18 Support Vector Machine ——最优美的机器学习算法
1.问题导入: 2.SVM定义和一些最优化理论 2.1SVM中的定义 2.1.1 定义 SVM 定义:SVM(Support Vector Machine,支持向量机)核心是寻找超平面将样本分成两类且间隔最大 。它功能多样,可用于线性或非线性分类…...
npm与pnpm--为什么推荐pnpm
包管理器中 npm是最经典的,但大家都任意忽略一个更优质的管理器:pnpm 1. 核心区别 特性npmpnpm依赖存储方式扁平化结构(可能重复依赖)硬链接 符号链接(共享依赖,节省空间)安装速度较慢&#…...
ollama调用千问2.5-vl视频图片UI界面小程序分享
1、问题描述: ollama调用千问2.5-vl视频图片内容,通常用命令行工具不方便,于是做了一个python UI界面与大家分享。需要提前安装ollama,并下载千问qwen2.5vl:7b 模型,在ollama官网即可下载。 (8G-6G 显卡可…...
济南国网数字化培训班学习笔记-第三组-1-电力通信传输网认知
电力通信传输网认知 电力通信基本情况 传输介质 传输介质类型(导引与非导引) 导引传输介质,如电缆、光纤; 非导引传输介质,如无线电波; 传输介质的选择影响信号传输质量 信号传输模式(单工…...
Kubernetes控制平面组件:Kubelet详解(六):pod sandbox(pause)容器
云原生学习路线导航页(持续更新中) kubernetes学习系列快捷链接 Kubernetes架构原则和对象设计(一)Kubernetes架构原则和对象设计(二)Kubernetes架构原则和对象设计(三)Kubernetes控…...
51单片机,两路倒计时,LCD1602 ,Proteus仿真
初始上电 默认2路都是0分钟的倒计时 8个按键 4个一组 一组控制一路倒计时 4个 按键:加 减 开始或者暂停 复位到0分钟相当于停止 针对第一路倒计时 4个 按键2:加 减 开始或者暂停 复位到0分钟相当于停止 针对第2路倒计时 哪一路到了0后蜂鸣器响 对应LED点亮 main.c 文件实现了…...
MySQL之储存引擎和视图
一、储存引擎 基本介绍: 1、MySQL的表类型由储存引擎(Storage Engines)决定,主要包括MyISAM、innoDB、Memory等。 2、MySQL数据表主要支持六种类型,分别是:CSV、Memory、ARCHIVE、MRG_MYISAN、MYISAM、InnoBDB。 3、这六种又分…...
写spark程序数据计算( 数据库的计算,求和,汇总之类的)连接mysql数据库,写入计算结果
1. 添加依赖 在项目的 pom.xml(Maven)中添加以下依赖: xml <!-- Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.0…...
一:操作系统之系统调用
系统调用:用户程序与操作系统交互的桥梁 在计算机的世界里,应用程序是我们日常接触最多的部分,比如浏览器、文本编辑器、游戏等等。然而,这些应用程序并不能直接控制硬件资源,比如读写硬盘、创建新进程、发送网络数据…...
【ROS2】 核心概念6——通信接口语法(Interfaces)
古月21讲/2.6_通信接口 官方文档:Interfaces — ROS 2 Documentation: Humble documentation 官方接口代码实战:https://docs.ros.org/en/humble/Tutorials/Beginner-Client-Libraries/Single-Package-Define-And-Use-Interface.html ROS 2使用简化的描…...
SmartETL函数式组件的设计与应用
SmartETL框架主要采用了面向对象的设计思想,将ETL过程中的处理逻辑抽象为Loader和Processor(对应loader模块和iterator模块),所有流程组件需要继承或实现DataProvider(iter方法)或JsonIterator(…...
Spring Security与SaToken的对比与优缺点分析
Spring Security与SaToken对比分析 一、框架定位 Spring Security 企业级安全解决方案,深度集成Spring生态提供完整的安全控制链(认证、授权、会话管理、攻击防护)适合中大型分布式系统 SaToken 轻量级权限认证框架,专注Token会…...
|从零开始的Pyside2界面编程| 环境搭建以及第一个ui界面
🐑 |从零开始的Pyside2界面编程| 环境搭建以及第一个ui界面🐑 文章目录 🐑 |从零开始的Pyside2界面编程| 环境搭建以及第一个ui界面🐑♈前言♈♈Pyside2环境搭建♈♈做个简单的UI界面♈♒代码实现♒♒QTdesigner设计UI界面♒ ♒总…...
【爬虫】DrissionPage-7
官方文档: https://www.drissionpage.cn/browser_control/get_page_info/ 1. 页面信息 📌 html 描述:返回当前页面的 HTML 文本。注意:不包含 <iframe> 元素的内容。返回类型:str 示例: html_co…...
系统架构设计(十二):统一过程模型(RUP)
简介 RUP 是由 IBM Rational 公司提出的一种 面向对象的软件工程过程模型,以 UML 为建模语言,是一种 以用例为驱动、以架构为中心、迭代式、增量开发的过程模型。 三大特征 特征说明以用例为驱动(Use Case Driven)需求分析和测…...
深入解析Java事件监听机制与应用
Java事件监听机制详解 一、事件监听模型组成 事件源(Event Source) 产生事件的对象(如按钮、文本框等组件) 事件对象(Event Object) 封装事件信息的对象(如ActionEvent包含事件源信息…...
QT聊天项目DAY11
1. 验证码服务 1.1 用npm安装redis npm install redis 1.2 修改config.json配置文件 1.3 新建redis.js const config_module require(./config) const Redis require("ioredis");// 创建Redis客户端实例 const RedisCli new Redis({host: config_module.redis_…...
Python训练营---Day29
知识点回顾 类的装饰器装饰器思想的进一步理解:外部修改、动态类方法的定义:内部定义和外部定义 作业:复习类和函数的知识点,写下自己过去29天的学习心得,如对函数和类的理解,对python这门工具的理解等&…...
Flask-SQLAlchemy_数据库配置
1、基本概念(SQLAlchemy与Flask-SQLAlchemy) SQLAlchemy 是 Python 生态中最具影响力的 ORM(对象关系映射)库,其设计理念强调 “框架无关性”,支持在各类 Python 项目中独立使用,包括 Flask、D…...
世界银行数字经济指标(1990-2022年)-社科数据
世界银行数字经济指标(1990-2022年)-社科数据https://download.csdn.net/download/paofuluolijiang/90623839 https://download.csdn.net/download/paofuluolijiang/90623839 此数据集涵盖了1990年至2022年间全球各国的数字经济核心指标,数据…...
Redis进阶知识
Redis 1.事务2. 主从复制2.1 如何启动多个Redis服务器2.2 监控主从节点的状态2.3 断开主从复制关系2.4 额外注意2.5拓扑结构2.6 复制过程2.6.1 数据同步 3.哨兵选举原理注意事项 4.集群4.1 数据分片算法4.2 故障检测 5. 缓存5.1 缓存问题 6. 分布式锁 1.事务 Redis的事务只能保…...
NY337NY340美光固态颗粒NC010NC012
NY337NY340美光固态颗粒NC010NC012 在存储技术的浩瀚星空中,美光的NY337、NY340、NC010、NC012等固态颗粒宛如璀璨星辰,闪耀着独特的光芒。它们承载着先进技术与无限潜力,正深刻影响着存储行业的格局与发展。 一、技术架构与核心优势 美光…...
DAY26 函数定义与参数
浙大疏锦行-CSDN博客 知识点回顾: 1.函数的定义 2.变量作用域:局部变量和全局变量 3.函数的参数类型:位置参数、默认参数、不定参数 4.传递参数的手段:关键词参数 5.传递参数的顺序:同时出现三种参数类型时 函数的定义…...
系统安全及应用
目录 一、账号安全控制 1.基本安全措施 (1)系统账号清理 (2)密码安全控制 (3)历史命令,自动注销 2.用户提权和切换命令 2.1 su命令用法 2.2 sudo命令提权 2.3通过是sudo执行特权命令 二、系统引导和登录控制…...
微信小程序 地图 使用 射线法 判断目标点是否在多边形内部(可用于判断当前位置是否在某个区域内部)
目录 射线法原理简要逻辑代码 小程序代码调试基础库小程序配置地图数据地图多边形点与多边形关系 射线法 原理 使用射线法来判断,目标点是否在多边形内部 这里简单说下,具体细节可以看这篇文章 平面几何:判断点是否在多边形内(…...
第三十七节:视频处理-视频读取与处理
引言:解码视觉世界的动态密码 在数字化浪潮席卷全球的今天,视频已成为信息传递的主要载体。从短视频平台的爆火到自动驾驶的视觉感知,视频处理技术正在重塑人类与数字世界的交互方式。本指南将深入探讨视频处理的核心技术,通过Python与OpenCV的实战演示,为您揭开动态影像…...
什么是 Flink Pattern
在 Apache Flink 中,Pattern 是 Flink CEP(Complex Event Processing)模块 的核心概念之一。它用于定义你希望从数据流中检测出的 事件序列模式(Event Sequence Pattern)。 🎯 一、什么是 Flink Pattern&am…...
ADB基本操作和命令
1.ADB的含义 adb 命令是 Android 官方提供,调试 Android 系统的工具。 adb 全称为 Android Debug Bridge(Android 调试桥),是 Android SDK 中提供的用于管理 Android 模拟器或真机的工具。 adb 是一种功能强大的命令行工具&#x…...
NSString的三种实现方式
oc里的NSString有三种实现方式,为_ _NSCFConstantString、__NSCFString、NSTaggedPointerString 1._ _NSCFConstantString(字面量字符串) 从字面意思上可以看出,_ _NSCFContantString可以理解为常量字符串,这种类型的字符串在编译期就确定了…...
2025年PMP 学习二十 第13章 项目相关方管理
第13章 项目相关方管理 序号过程过程组过程组1识别相关方启动2规划相关方管理规划3管理相关方参与与执行4监控相关方参与与监控 相关方管理,针对于团队之外的相关方的,核心目标是让对方为了支持项目,以达到项目目标。 文章目录 第13章 项目相…...
学习黑客Kerberos深入浅出:安全王国的门票系统
Kerberos深入浅出:安全王国的门票系统 🎫 作者: 海尔辛 | 发布时间: 2025-05-18 🔑 理解Kerberos:为什么它如此重要? Kerberos是现代网络环境中最广泛使用的身份验证协议之一,尤其在Windows Active Dire…...
蓝桥杯19681 01背包
问题描述 有 N 件物品和一个体积为 M 的背包。第 i 个物品的体积为 vi,价值为 wi。每件物品只能使用一次。 请问可以通过什么样的方式选择物品,使得物品总体积不超过 M 的情况下总价值最大,输出这个最大价值即可。 输入格式 第一行输…...
使用 Auto-Keras 进行自动化机器学习
使用 Auto-Keras 进行自动化机器学习 了解自动化机器学习以及如何使用 auto-keras 完成它。如今,机器学习并不是一个非常罕见的术语,因为像 DataCamp、Coursera、Udacity 等组织一直在努力提高他们的效率和灵活性,以便将机器学习的教育带给普…...
算法刷题Day9 5.18:leetcode定长滑动窗口3道题,结束定长滑动窗口,用时1h
12. 1852.每个子数组的数字种类数 1852. 每个子数组的数字种类数 - 力扣(LeetCode) 思想 找到nums 所有 长度为 k 的子数组中 不同 元素的数量。 返回一个数组 ans,其中 ans[i] 是对于每个索引 0 < i < n - k,nums[i..(i …...
Protect Your Digital Privacy: Obfuscate, Don’t Hide
Protect Your Digital Privacy: Obfuscate, Don’t Hide In today’s digital world, hiding completely online is nearly impossible. But you can protect yourself by deliberately obfuscating your personal information — making it harder for others to track, pro…...
Spark 的运行模式(--master) 和 部署方式(--deploy-mode)
Spark 的 运行模式(--master) 和 部署方式(--deploy-mode),两者的核心区别在于 资源调度范围 和 Driver 进程的位置。 一、核心概念对比 维度--master(运行模式)--deploy-mode(部署…...
从零开始实现大语言模型(十五):并行计算与分布式机器学习
1. 前言 并行计算与分布式机器学习是一种使用多机多卡加速大规模深度神经网络训练过程,以减少训练时间的方法。在工业界的训练大语言模型实践中,通常会使用并行计算与分布式机器学习方法来减少训练大语言模型所需的钟表时间。 本文介绍PyTorch中的一种…...