【RabbitMQ】 RabbitMQ高级特性(一)
文章目录
- 一、消息确认
- 1.1、消息确认机制
- 1.2、手动确认方法
- 1.2.1、AcknowledgeMode.NONE
- 1.2.2、AcknowledgeMode.AUTO
- 1.3.3、AcknowledgeMode.MANUAL
- 二、持久性
- 2.1、 交换机持久化
- 2.2、队列持久化
- 2.3、消息持久化
- 三、发送方确认
- 3.1、confirm确认模式
- 3.2、return退回模式
- 3.3、常见面试题
- 结语
一、消息确认
1.1、消息确认机制
生产者发送消息之后, 到达消费端之后, 可能会有以下情况:
a. 消息处理成功
b. 消息处理异常
RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第两种情况, 就会造成消息丢失. 那么如何确保消费端已经成功接收了, 并正确处理了呢?
为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制(message acknowledgement)。 消费者在订阅队列时,可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制分为以下两种:
• 自动确认: 当autoAck 等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, 而不管消费者是否真正地消费到了这些消息. 自动确认模式适合对于消息可靠性要求不高的场景.
• 手动确认: 当autoAck等于false时,RabbitMQ会等待消费者显式地调用Basic.Ack命令, 回复确认信号后才从内存(或者磁盘) 中移去消息. 这种模式适合对消息可靠性要求比较高的场景
代码示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1,true,consumer);
当autoAck参数置为false, 对于RabbitMQ服务端额而言, 队列中的消息分成了两个部分: ⼀是等待投递给消费者的消息. ⼆是已经投递给消费者, 但是还没有收到消费者确认信号的消息. 如果RabbitMQ一直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ会安排该消息重新进入队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者
从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的消息数:
Ready: 等待投递给消费者的消息数
Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息数
1.2、手动确认方法
消费者在收到消息之后, 可以选择确认, 也可以选择直接拒绝或者跳过, RabbitMQ也提供了不同的确认应答的方式, 消费者客户端可以调⽤与其对应的channel的相关⽅法, 共有以下三种:
1. 肯定确认: Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道该消息并且成功的处理消息. 可以将其丢弃了.
参数说明:
- deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的64 位的⻓整型值. deliveryTag 是每个通道(Channel)独⽴维护的, 所以在每个通道上都是唯⼀的. 当消费者确认(ack)⼀条消息时, 必须使⽤对应 的通道上进⾏确认.
2 ) multiple: 是否批量确认. 在某些情况下, 为了减少⽹络流量, 可以对⼀系列连续的 deliveryTag 进行批量确认. 值为 true 则会⼀次性 ack所有⼩于或等于指定 deliveryTag 的消息. 值为false, 则只确认当 前指定deliveryTag的消息
若deliverTag = 8时,如果此时multiple 为true ,那么此时8以前的消息都会被确认
反之,如果multiple为false ,那么此时只确认消息8。
deliveryTag 是RabbitMQ中消息确认机制的⼀个重要组成部分, 它确保了消息传递的可靠性和顺序性。
2. 否定确认: Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ在2.0.0版本开始引⼊了 Basic.Reject 这个命令, 消费者客⼾端可以调用channel.basicReject方法来告诉RabbitMQ拒绝这个消息.
参数说明:
- deliveryTag: 参考channel.basicAck
- requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条 消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果requeue参数设置为false, 则RabbitMQ会把 消息从队列中移除, ⽽不会把它发送给新的消费者
3. 否定确认: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令. 消费者客户端可以调用channel.basicNack方法来实现.
参数介绍参考上面两个方法.
multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息.
代码示例:
Spring-AMQP 对消息确认机制提供了三种策略
public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
- AcknowledgeMode.NONE
◦ 这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会自动确认 消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失. - AcknowledgeMode.AUTO(默认)
◦ 这种模式下, 消费者在消息处理成功时会自动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息. - AcknowledgeMode.MANUAL
◦ ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可用时重新投递该消息, 这 种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, 而是可以被重新处理.
主要流程:
- 配置确认机制(自动确认/手动机制)
- 生产者发送消息
- 消费端逻辑
- 测试
1.2.1、AcknowledgeMode.NONE
- 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
- 发送消息:
队列,交换机配置
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}/*以下为消费端⼿动应答代码⽰例配置*/@Bean("ackExchange")public Exchange ackExchange() {returnExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}
通过接口发送消息:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack","consumer ack test...");return "发送成功!";}}
- 写消费端逻辑
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Componentpublic class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败//int num = 3 / 0;System.out.println("处理完成");}
这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0
- 运行程序
调用接口, 发送消息 可以看到队列中有⼀条消息, unacked的为0(需要先把消费者注掉)
开启消费者, 控制台输出:
接收到消息: consumer ack test..., deliveryTag: 1
2 2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....
管理界面:
1.2.2、AcknowledgeMode.AUTO
1、配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto
- 重新运行程序
调用接口, 发送消息 可以看到队列中有⼀条消息, unacked的为0(需要先把消费者注掉)
开启消费者, 控制台不断输出错误信息:
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
从日志上可以看出, 当消费者出现异常时, RabbitMQ会不断的重发. 由于异常,多次重试还是失败,消息没被确认,也无法nack,就⼀直是unacked状态,导致消息积压
1.3.3、AcknowledgeMode.MANUAL
- 配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual
- 消费端⼿动确认逻辑
import com.bite.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Componentpublic class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制// int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃channel.basicNack(deliveryTag, true, true);}}}
这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0
控制台输出:
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑.....
管理界面:
- 异常时拒绝签收
@Componentpublic class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃channel.basicNack(deliveryTag, true, true);}}}
运行结果: 消费异常时不断重试, deliveryTag 从1递增控制台日志:
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 2
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 3
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 4
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 5
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 6
处理业务逻辑.....
管理界⾯上unacked也变成了1
Unacked的状态变化很快, 为方便观察, 消费消息前增加⼀下休眠时间Thread.sleep(10000);
二、持久性
在前⾯讲了消费端处理消息时, 消息如何不丢失, 但是如何保证当RabbitMQ服务停掉以后, 生产者发送的消息不丢失呢. 默认情况下, RabbitMQ 退出或者由于某种原因崩溃时, 会忽视队列和消息, 除非告知他不要这么做.
RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化.
2.1、 交换机持久化
交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机, 交换 机会自动建立,相当于⼀直存在. 如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失, 对⼀个⻓期 使用的交换器来说,建议将其置为持久化的.
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
2.2、队列持久化
队列的持久化是通过在声明队列时将 durable 参数置为 true实现的. 如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没 有了, 消息也无处可存了)。 队列的持久化能保证该队列本身的元数据不会因异常情况而丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化. 咱们前面用的创建队列的方式都是持久化的
QueueBuilder.durable(Constant.ACK_QUEUE).build();
点进去看源码会发现,该⽅法默认durable 是true
public static QueueBuilder durable(String name) {return (new QueueBuilder(name)).setDurable();
}
private QueueBuilder setDurable() {this.durable = true;return this;
}
通过下⾯代码,可以创建非持久化的队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3、消息持久化
消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是MessageDeliveryMode.PERSISTENT
NON_PERSISTENT,//⾮持久化
PERSISTENT;//持久化
设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在. 如果只设置队列持久化, 重启之后消息会丢失. 如果只设置消息的持久化, 重启之后队列消失, 继而消息也丢失. 所以单单设置消息 持久化而不设置队列的持久化显得毫无意义
//⾮持久化信息
channel.basicPublish(“”,QUEUE_NAME,null,msg.getBytes());
//持久化信息
channel.basicPublish(“”,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2, //deliveryMode0, null, null, null,null, null, null, null,null, null);
如果使⽤RabbitTemplate 发送持久化消息, 代码如下:
// 要发送的消息内容
String message = "This is a persistent message";
// 创建⼀个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), newMessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使⽤RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
注意:RabbitMQ默认情况下会将消息视为持久化的,除⾮队列被声明为非持久化,或者消息在发送时被标记为非持久化
将所有的消息都设置为持久化, 会严重影响RabbitMQ的性能(随机). 写入磁盘的速度比写入内 存的速度慢得不只一点点. 对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量. 在选择是否要将消息持久化时, 需要在可靠性和吐吞量之间做⼀个权衡
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗? 答案是否定的
- 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之 后, 还没来得及处理就宕机了, 这样也算数据居丢失. 这种情况很好解决, 将autoAck参数设置为false, 并进行手动确认。
2 . 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存入磁盘 中.RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理, 可能仅仅保存到操 作系统缓存之中而不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异 常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失
这个问题怎么解决呢?
可以在发送端引入事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储至RabbitMQ中,即"发送方确认"
三、发送方确认
在使用 RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还 有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到 达服务器之前已经丢失(比如RabbitMQ重启, 那么RabbitMQ重启期间生产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决⽅案:
a. 通过事务机制实现
b. 通过发送方确认(publisher confirm) 机制实现
事务机制⽐较消耗性能, 在实际工作中使用也不多, 咱们主要介绍confirm机制来实现发送方的确认.RabbitMQ为我们提供了两个方式来控制消息的可靠性投递
- confirm确认模式
- return退回模式
3.1、confirm确认模式
Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, 无论消息是否到达Exchange, 这个监听都会被执行, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false.
步骤如下:
- 配置RabbitMQ
- 设置确认回调逻辑并发送消息
接下来看实现步骤
- 配置RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/numslistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认
- 设置确认回调逻辑并发送消息
⽆论消息确认成功还是失败, 都会调⽤ConfirmCallback的confirm方法. 如果消息成功发送到Broker,ack为true.如果消息发送失败, ack为false, 并且cause提供失败的原因
@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack,String cause) {System.out.printf("");if (ack) {System.out.printf("消息接收成功, id:%s \n",correlationData.getId());} else {System.out.printf("消息接收失败, id:%s, cause: %s",correlationData.getId(), cause);}}});return rabbitTemplate;}@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedExceptionCorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,"confirm","confirm test...",correlationData1);return"确认成功";
}
⽅法说明:
public interface ConfirmCallback {/*** 确认回调* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消 息* @param ack: 交换机是否收到消息, 收到为true, 未收到为false* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调 试和错误处理.* 成功时, cause为null */void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别
在RabbitMQ中, ConfirmListener和ConfirmCallback都是用来处理消息确认的机制, 但它们属于不同的客户端库, 并且使用的场景和方式有所不同.
1 . ConfirmListener 是 RabbitMQ Java Client 库中的接口. 这个库是 RabbitMQ 官方提供的一个直接与RabbitMQ服务器交互的客户端库. ConfirmListener 接口提供了两个方法: handleAck 和handleNack, 用于处理消息确认和否定确认的事件.
2 . ConfirmCallback 是 Spring AMQP 框架中的⼀个接口. 专门为Spring环境设计. 用于简化与RabbitMQ交互的过程. 它只包含⼀个 confirm方法,⽤于处理消息确认的回调.
在 Spring Boot 应用中, 通常会使用 ConfirmCallback, 因为它与 Spring 框架的其他部分更加整合, 可 以利用 Spring 的配置和依赖注入功能. 而在使用 RabbitMQ Java Client 库时, 则可能会直接实现ConfirmListener 接口, 更直接的RabbitMQChannel交互
3.2、return退回模式
消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回 给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调方法, 对消息进行处理.
步骤如下:
- 配置RabbitMQ
- 设置返回回调逻辑并发送消息
1 . 配置RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/numslistener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确认
- 设置返回回调逻辑并发送消息
这里是引用消息无法被路由到任何队列, 它将返回给发送者,这时setReturnCallback设置的回调将被触发
@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}@RequestMapping("/msgReturn")public String msgReturn() {CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm11", "message return test...", correlationData);return "消息发送成功";}
使⽤RabbitTemplate的setMandatory方法设置消息的mandatory属性为true(默认为false). 这个属性的作用是告诉RabbitMQ, 如果⼀条消息无法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此 时 ReturnCallback 就会被触发.
回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:
public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性private final Message message;//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同 的含义. private final int replyCode;//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.private final String replyText;//消息被发送到的交换机名称private final String exchange;//消息的路由键,即发送消息时指定的键private final String routingKey;}
3.3、常见面试题
如何保证RabbitMQ消息的可靠传输?
先放⼀张RabbitMQ消息传递图:
从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:
- ⽣产者将消息发送到 RabbitMQ失败
a. 可能原因: 网络问题等
b. 解决办法: 参考本章节[发送方确认-confirm确认模式] - 消息在交换机中无法路由到指定队列:
a. 可能原因: 代码或者配置层面错误, 导致消息路由失败
b. 解决办法: 参考本章节[发送方确认-return模式] - 消息队列自身数据丢失
a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
b. 解决办法: 参考本章节[持久性]. 开启 RabbitMQ持久化, 就是消息写入之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会自动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的方式提高可靠性) - 消费者异常, 导致消息丢失
a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
b. 解决办法: RabbitMQ 提供了 消费者应答机制来使 RabbitMQ 能够感知 到消费者是否消费成功消息. 默认情况下消费者应答机制是自动应答的, 可以开启手动确认, 当消费者确认消费成功后才会删除消息, 从而避免消息丢失. 除此之外, 也可以配置重试机制(下篇文章会为大家介绍), 当消息消费异常时, 通过消息重试确保消息的可靠性
结语
本篇文章主要介绍了RAbbitMQ中的部分高级特性,主要从消息确认,持久化,发送方确认三个方面展开
以上就是本文全部内容,感谢各位能够看到最后,如有问题,欢迎各位大佬在评论区指正,希望大家可以有所收获!创作不易,希望大家多多支持!
最后,大家再见!祝好!我们下期见!
相关文章:
【RabbitMQ】 RabbitMQ高级特性(一)
文章目录 一、消息确认1.1、消息确认机制1.2、手动确认方法1.2.1、AcknowledgeMode.NONE1.2.2、AcknowledgeMode.AUTO1.3.3、AcknowledgeMode.MANUAL 二、持久性2.1、 交换机持久化2.2、队列持久化2.3、消息持久化 三、发送方确认3.1、confirm确认模式3.2、return退回模式3.3、…...
优化理赔数据同步机制:从4小时延迟降至15分钟
优化理赔数据同步机制:从4小时延迟降至15分钟 1. 分析当前同步瓶颈 首先诊断当前同步延迟原因: -- 检查主从复制状态(在主库执行) SHOW MASTER STATUS; SHOW SLAVE HOSTS;-- 在从库执行检查复制延迟 SHOW SLAVE STATUS\G -- 关…...
lampiao靶场渗透
lampiao https://www.vulnhub.com/entry/lampiao-1,249/ 1,将两台虚拟机网络连接都改为NAT模式 2,攻击机上做namp局域网扫描发现靶机 nmap -sn 192.168.23.0/24 那么攻击机IP为192.168.23.182,靶场IP192.168.23.245 3,对靶机进行端…...
云计算中的虚拟化:成本节省、可扩展性与灾难恢复的完美结合
云计算中虚拟化的 4 大优势 1. 成本效益 从本质上讲,虚拟化最大限度地减少了硬件蔓延。团队可以将多个虚拟机整合到单个物理主机上,而不是为每个工作负载部署单独的服务器。这大大减少了前期硬件投资和持续维护。 结果如何?更低的功耗、更低…...
jenkins built-in节点如何删除
1 概述 在 Jenkins 中,默认的 “Built-In” 节点(即主节点/master)无法直接删除,因为它是 Jenkins 的核心组件。它的存在,有时会造成困扰,因为部分作业调度到其上,由于 “Built-In” 节点的环境…...
QSS样式表的选择器
一个最简单的样式设置格式如下 QWidget {background-color: black; }将样式应用到对应的控件 QWidget* w new QWidget; w->setStyleSheet("QWidget {background-color: black;}");样式表中控件的设置有多种方式 通用选择器 /*匹配所有控件*/ *{}类型选择器 …...
Python多环境管理指南
Python/UV 多环境管理指南 在Python开发中,管理多个项目环境是一个常见需求。以下是使用Python内置工具和UV(一种新兴的Python包管理器)进行多环境管理的方法。 1. 使用Python内置venv管理多环境 创建虚拟环境 python -m venv /path/to/y…...
Java从入门到精通 - 数组
数组 此笔记参考黑马教程,仅学习使用,如有侵权,联系必删 文章目录 数组1. 认识数组2. 数组的定义和访问2.1 静态初始化数组2.1.1 数组的访问2.1.1 定义代码实现总结 2.1.2 数组的遍历2.1.2.1 定义代码演示总结 案例代码实现 2.2 动态初始化…...
《Vuejs 设计与实现》第 4 章(响应式系统)( 下 )
目录 4.6 避免无限递归循环 4.7 调度执行 4.8 计算属性 computed 与 lazy 4.9 watch 的实现原理 4.10 立即执行的 watch 与回调执行时机 4.11 过期副作用与竞态问题 总结 4.6 避免无限递归循环 在实现完善响应式系统时,需要注意避免无限递归循环。以以下代码…...
在 Windows 上为 Intel UHD Graphics 编译 OpenCL 程序
如果您使用的是 Intel UHD Graphics 集成显卡,以下是完整的 OpenCL 开发环境配置指南: 1. 准备工作 确认硬件支持 首先确认您的 Intel UHD Graphics 支持 OpenCL: 大多数第6代及以后的 Intel Core 处理器(Skylake 及更新架构)都支持 OpenCL 2.1+ 运行 clinfo 工具可以查…...
C++自学笔记 makefile
本博客参考南科大于仕琪教授的讲解视频和这位同学的学习笔记: 参考博客 感谢两位的分享。 makefile 的作用 用于组织大型项目的编译,是一个一键编译项目的脚本文件。 本博客通过四个版本的makefile逐步说明makefile的使用 使用说明 四个演示文件 …...
【PDF】使用Adobe Acrobat dc添加水印和加密
【PDF】使用Adobe Acrobat dc添加水印和加密 文章目录 [TOC](文章目录) 前言一、添加保护加密口令二、添加水印三、实验四、参考文章总结 实验工具: 1.Adobe Acrobat dc 前言 提示:以下是本篇文章正文内容,下面案例可供参考 一、添加保护加…...
客服系统重构详细计划
# 客服系统重构详细计划 ## 第一阶段:系统分析与准备工作 ### 1. 代码审查和分析 (1-2周) - 全面分析现有代码结构 - 识别代码中的问题和瓶颈 - 理解当前系统的业务逻辑 - 确定可重用的组件 - 制作系统功能清单 ### 2. 技术栈升级准备 (1周) - 升级PHP版本到7…...
基于VSCode + PlatformIO平台的ESP8266的DS1302实时时钟
基于ESP8266的DS1302实时时钟系统开发 一、项目概述 本实验通过ESP8266开发板实现: DS1302实时时钟模块的驱动系统时间同步与维护串口实时时间显示RTC模块状态监控 硬件组成: NodeMCU ESP8266开发板DS1302实时时钟模块CR2032纽扣电池(备…...
Flink 系列之十四 - Data Stream API的自定义数据类型
之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,…...
【数据结构】线性表
目录 1.1 线性表的概念 1.1.1 线性表的抽象数据类型 1.1.2 线性表的存储结构 1.1.3 线性表运算分类 1.2 顺序表 1.2.1 顺序表的类定义 1.2.2 顺序表的运算实现 1. 顺序表的检索 2. 顺序表的插入 3. 顺序表的删除 1.3 链表 1.3.1 单链表 1. 链表的检索 2. 链表的插…...
大疆卓驭嵌入式面经及参考答案
FreeRTOS 有哪 5 种内存管理方式? heap_1.c:这种方式简单地在编译时分配一块固定大小的内存,在整个运行期间不会进行内存的动态分配和释放。它适用于那些对内存使用需求非常明确且固定,不需要动态分配内存的场景,优点是…...
【网络】:传输层协议 —— UDP、TCP协议
目录 UDP协议 UDP协议的核心特点 UDP协议格式 UDP的缓冲区 基于UDP的应用层协议 TCP协议 TCP协议的核心特点 TCP协议格式 确认应答机制 连接管理机制 三次握手 四次挥手 流量控制 滑动窗口 拥塞控制 基于字节流 粘包和拆包 可靠性和性能保障 基于TCP的应用层…...
每日c/c++题 备战蓝桥杯(洛谷P1115 最大子段和)
洛谷P1115 最大子段和 题解 题目描述 最大子段和是一道经典的动态规划问题。题目要求:给定一个包含n个整数的序列,找出其中和最大的连续子序列,并输出该最大和。若所有数均为负数,则取最大的那个数。 输入格式: 第…...
Python与矢量网络分析仪3671E:通道插损自动化校准(Vscode)
一、背景介绍 DUT集成了多个可调衰减的射频通道,可调衰减由高精度DAC和VVA构成,使用中电思仪的3671E矢量网络分析仪测试DUT的S参数,并自动化调整VVA的控制电压,以自动化获取指定衰减值对应的控制电平。 二、前期准备 Python环境&…...
设计模式系列(1):总览与引导
目录 前言 设计模式简介 UML与设计模式 术语解释 UML工具与PlantUML 面向对象设计原则(SOLID等) 设计模式分类与典型场景 设计模式的价值 学习与实践建议 常见面试题 推荐阅读 1. 前言 本篇为设计模式系列的第一篇,定位为总览和引导,旨在为后续各专题打下基础,帮助大家…...
Day21打卡—常见降维算法
知识点回顾: LDA线性判别PCA主成分分析t-sne降维 作业: 自由作业:探索下什么时候用到降维?降维的主要应用?或者让ai给你出题,群里的同学互相学习下。可以考虑对比下在某些特定数据集上t-sne的可视化和pca可…...
什么是人工智能(Artificial Intelligence,AI)? —— 机器学习 =》 深度学习 =》 新型技术
文章目录 什么是人工智能(Artificial Intelligence,AI)? —— 关系:AI >> ML >> DL一、机器学习(Machine Learning,ML)1、历史2、类型(1)监督学习…...
iVX 平台技术解析:图形化与组件化的融合创新
一、图形化逻辑编程:用流程图替代代码的革命 iVX 的核心突破在于可视化逻辑表达—— 开发者通过拖拽 “逻辑块”(如条件判断、循环控制、数据操作等)来搭建应用逻辑,彻底摒弃传统代码的字符输入模式。这种 “所见即所得” 的开发…...
【Diffusion】在华为云ModelArts上运行MindSpore扩散模型教程
目录 一、背景与目的 二、环境搭建 三、模型原理学习 1. 类定义与初始化 2. 初始卷积层 3. 时间嵌入模块 4. 下采样模块 5. 中间模块 6. 上采样模块 7. 最终卷积层 8. 前向传播 9. 关键点总结 四、代码实现与运行 五、遇到的问题及解决方法 六、总结与展望 一、…...
跟我学c++高级篇——模板元编程之十三处理逻辑
一、元编程处理逻辑 无论在普通编程还是在元编程中,逻辑的处理,都是一个编程开始的必然经过。开发者对普通编程中的逻辑处理一般都非常清楚,不外乎条件谈判和循环处理。而条件判断常见的基本就是if语句(switch如果不考虑效率等情…...
组合模式(Composite Pattern)详解
文章目录 1. 什么是组合模式?2. 为什么需要组合模式?3. 组合模式的核心概念4. 组合模式的结构5. 组合模式的基本实现5.1 基础示例:文件系统5.2 透明组合模式 vs 安全组合模式5.2.1 透明组合模式5.2.2 安全组合模式5.3 实例:公司组织结构5.4 实例:GUI组件树6. Java中组合模…...
最长字符串 / STL+BFS
题目 代码 #include <bits/stdc.h> using namespace std;int main() {map<vector<int>, vector<string>> a;set<vector<int>> c;vector<int> initial(26, 0);c.insert(initial);ifstream infile("words.txt");string s;w…...
C++ stl中的set、multiset、map、multimap的相关函数用法
文章目录 序列式容器和关联式容器树形结构和哈希结构树形结构哈希结构 键值对setset的相关介绍set定义方式set相关成员函数multiset mapmap的相关介绍map定义方式map的相关操作1.map的插入2.map的查找3.map的删除 序列式容器和关联式容器 CSTL中包含了序列式容器和关联式容器&…...
普通IT的股票交易成长史--20250511 美元与美股强相关性
声明:本文章的内容非原创。参考了yt博主Andy Lee的观点,为了加深自己的学习印象才做的复盘,不构成投资建议。感谢他的无私奉献! 送给自己的话: 仓位就是生命,绝对不能满仓!!&#x…...
系统架构设计(四):架构风格总结
黑板 概念 黑板体系架构是一种用于求解复杂问题的软件架构风格,尤其适合知识密集型、推理驱动、数据不确定性大的场景。 它模拟了人类专家协同解决问题的方式,通过一个共享的“黑板”协同多个模块(专家)逐步构建解决方案。 组…...
ElasticSearch进阶
一、文档批量操作 1.批量获取文档数据 批量获取文档数据是通过_mget的API来实现的 (1)在URL中不指定index和type 请求方式:GET请求地址:_mget功能说明 : 可以通过ID批量获取不同index和type的数据请求参数: docs : 文档数组参…...
0基础 | L298N电机驱动模块 | 使用指南
引言 在嵌入式系统开发中,电机驱动是一个常见且重要的功能。L298N是一款高电压、大电流电机驱动芯片,广泛应用于各种电机控制场景,如直流电机的正反转、调速,以及步进电机的驱动等。本文将详细介绍如何使用51单片机来控制L298N电…...
Synchronized与锁升级
一、面试题 1)谈谈你对Synchronized的理解 2)Sychronized的锁升级你聊聊 3)Synchronized实现原理,monitor对象什么时候生成的?知道monitor的monitorenter和monitorexit这两个是怎么保证同步的嘛&#…...
MNIST DDP 分布式数据并行
Distributed Data Parallel 转自我的个人博客:https://shar-pen.github.io/2025/05/04/torch-distributed-series/3.MNIST_DDP/ The difference between DistributedDataParallel and DataParallel is: DistributedDataParallel uses multiprocessing where a proc…...
语音合成之十三 中文文本归一化在现代语音合成系统中的应用与实践
中文文本归一化在现代语音合成系统中的应用与实践 引言理解中文文本归一化(TN)3 主流LLM驱动的TTS系统及其对中文文本归一化的需求分析A. SparkTTS(基于Qwen2.5)与文本归一化B. CosyVoice(基于Qwen)与文本归…...
9.1.领域驱动设计
目录 一、领域驱动设计核心哲学 战略设计与战术设计的分野 • 战略设计:限界上下文(Bounded Context)与上下文映射(Context Mapping) • 战术设计:实体、值对象、聚合根、领域服务的构建原则 统一语言&am…...
如何配置光猫+路由器实现外网IP访问内部网络?
文章目录 前言一、网络拓扑理解二、准备工作三、光猫配置3.1 光猫工作模式3.2 光猫端口转发配置(路由模式时) 四、路由器配置4.1 路由器WAN口配置4.2 端口转发配置4.3 动态DNS配置(可选) 五、防火墙设置六、测试配置七、安全注意事…...
C++题题题题题题题题题踢踢踢
后缀表达式求值 #include<bits/stdc.h> #include<algorithm> using namespace std; string a[100]; string b[100]; stack<string> op; int la0,lb0; int main(){while(true){cin>>a[la];if(a[la]".") break;la;}for(int i0;i<la;i){if(…...
M. Moving Both Hands(反向图+Dijkstra)
Problem - 1725M - Codeforces 题目大意:给你一个有向图,起始点在1,问起始点分别与另外n-1个 点相遇的最短时间,无法相遇输出-1。 思路:反向建图,第一层建原图,第二层建反向图,两层…...
11、参数化三维产品设计组件 - /设计与仿真组件/parametric-3d-product-design
76个工业组件库示例汇总 参数化三维产品设计组件 (注塑模具与公差分析) 概述 这是一个交互式的 Web 组件,旨在演示简单的三维零件(如带凸台的方块)的参数化设计过程,并结合注塑模具设计(如开模动画)与公…...
智能座舱开发工程师面试题
一、基础知识类 简述智能座舱的核心组成部分及其功能 要求从硬件(如显示屏、传感器、控制器)和软件(操作系统、中间件、应用程序)层面展开,阐述各部分如何协同实现座舱的智能化体验。 对比 Android Automotive、QNX…...
【连载14】基础智能体的进展与挑战综述-多智能体系统设计
基础智能体的进展与挑战综述 从类脑智能到具备可进化性、协作性和安全性的系统 【翻译团队】刘军(liujunbupt.edu.cn) 钱雨欣玥 冯梓哲 李正博 李冠谕 朱宇晗 张霄天 孙大壮 黄若溪 在基于大语言模型的多智能体系统(LLM-MAS)中,合作目标和合…...
06.three官方示例+编辑器+AI快速学习webgl_animation_skinning_additive_blending
本实例主要讲解内容 这个Three.js示例展示了**骨骼动画(Skinning)和变形动画(Morphing)**的结合应用。通过加载一个机器人模型,演示了如何同时控制角色的肢体动作和面部表情,实现更加丰富的角色动画效果。 核心技术包括: 多动画混合与淡入…...
【Java学习日记36】:javabeen学生系统
ideal快捷键...
.Net HttpClient 使用请求数据
HttpClient 使用请求数据 0、初始化及全局设置 //初始化:必须先执行一次 #!import ./ini.ipynb1、使用url 传参 参数放在Url里,形如:http://www.baidu.com?namezhangsan&age18, GET、Head请求用的比较多。优点是简单、方便࿰…...
详解 Java 并发编程 synchronized 关键字
synchronized 关键字的作用 synchronized 是 Java 中用于实现线程同步的关键字,主要用于解决多线程环境下的资源竞争问题。它可以修饰方法或代码块,确保同一时间只有一个线程可以执行被修饰的代码,从而避免数据不一致的问题。 synchronized…...
《Go小技巧易错点100例》第三十二篇
本期分享: 1.sync.Map的原理和使用方式 2.实现有序的Map sync.Map的原理和使用方式 sync.Map的底层结构是通过读写分离和无锁读设计实现高并发安全: 1)双存储结构: 包含原子化的 read(只读缓存,无锁快…...
时序约束高级进阶使用详解四:Set_False_Path
目录 一、背景 二、Set_False_Path 2.1 Set_false_path常用场景 2.2 Set_false_path的优势 2.3 Set_false_path设置项 2.4 细节区分 三、工程示例 3.1 工程代码 3.2 时序约束如下 3.3 时序报告 3.4 常规场景 3.4.1 设计代码 3.4.2 约束场景 3.4.3 约束对象总结…...
每日定投40刀BTC(16)20250428 - 20250511
定投 坚持 《恒道》 长河九曲本微流,岱岳摩云起累丘。 铁杵十年销作刃,寒窗五鼓淬成钩。已谙蜀栈盘空险,更蓄湘竹带泪遒。 莫问枯荣何日证,星霜满鬓亦从头。...