当前位置: 首页 > news >正文

RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)

RabbitMQ高级特性

 RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客

RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)-CSDN博客


引言

RabbitMQ 作为一款强大的消息队列中间件,在分布式系统中发挥着至关重要的作用。除了基本的消息收发功能外,它还具备许多高级特性,如 TTL、死信队列、延迟队列、事务和消息分发等。本文将详细介绍这些高级特性。


1. TTL(Time to Live,过期时间)

1.1 概念

TTL 即过期时间,RabbitMQ 可以对消息和队列设置 TTL。当消息到达存活时间之后,还没有被消费,就会被自动清除。这在很多业务场景中都非常有用,比如网上购物时,下单超过 24 小时未付款,订单会被自动取消;申请退款之后,超过 7 天未被处理,则自动退款。

1.2 设置消息的 TTL

有两种方法可以设置消息的 TTL,一是设置队列的 TTL,队列中所有消息都有相同的过期时间;二是对消息本身进行单独设置,每条消息的 TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。

1.2.1 针对每条消息设置 TTL

针对每条消息设置 TTL 的方法是在发送消息的方法中加入 expiration 的属性参数,单位为毫秒。

配置交换机和队列

// TTL
public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE_NAME = "ttl_exchange";// 1. 交换机
@Bean("ttlExchange")
public Exchange ttlExchange() {return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();
}// 2. 队列
@Bean("ttlQueue")
public Queue ttlQueue() {return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}// 3. 队列和交换机绑定 Binding
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange);
}

发送消息

@RequestMapping("/ttl")
public String ttl() {String ttlTime = "10000"; // 10srabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration(ttlTime);return messagePostProcessor;});return "发送成功!";
}

运行结果
调用接口发送消息后,可以看到 Ready 消息为 1。10 秒钟之后,刷新页面,发现消息已被删除。如果不设置 TTL,则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。

1.3 设置队列的 TTL

设置队列 TTL 的方法是在创建队列时,加入 x-message-ttl 参数实现的,单位是毫秒。

配置队列和绑定关系

public static final String TTL_QUEUE2 = "ttl_queue2";// 设置 ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {// 设置 20 秒过期return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();
}// 3. 队列和交换机绑定 Binding
@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);
}

发送消息

@RequestMapping("/ttl")
public String ttl() {// 发送不带 ttl 的消息rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...");return "发送成功!";
}

运行结果
运行之后发现,新增了一个队列,队列 Features 有一个 TTL 标识。调用接口发送消息后,可以看到 Ready 消息为 1。采用发布订阅模式,所有与该交换机绑定的队列(ttl_queue 和 ttl_queue2)都会收到消息。20 秒钟之后,刷新页面,发现 ttl_queue2 中的消息已被删除,由于 ttl_queue 队列未设置过期时间,所以该队列的消息未删除。

1.4 两者区别

设置队列 TTL 属性的方法,一旦消息过期,就会从队列中删除;设置消息 TTL 的方法,即使消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前进行判定的。

这是因为设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。

而设置消息 TTL 的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。


2. 死信队列

2.1 死信的概念

死信(dead message)简单理解就是因为种种原因,无法被消费的信息,就是死信。当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX(Dead Letter Exchange),绑定 DLX 的队列,就称为死信队列(Dead Letter Queue,简称 DLQ)。

消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false
  • 消息过期。
  • 队列达到最大长度。

2.2 代码示例

2.2.1 声明队列和交换机

包含两部分:声明正常的队列和正常的交换机;声明死信队列和死信交换机。死信交换机和死信队列和普通的交换机、队列没有区别。

// 死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";@Configuration
public class DLXConfig {// 死信交换机@Bean("dlxExchange")public Exchange dlxExchange() {return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();}// 2. 死信队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DLX_QUEUE).build();}// 3. 死信队列和交换机绑定 Binding@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}// 正常交换机@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();}// 正常队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();}// 正常队列和交换机绑定 Binding@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}
}
2.2.2 正常队列绑定死信交换机

当这个队列中存在死信时,RabbitMQ 会自动的把这个消息重新发布到设置的 DLX 上,进而被路由到另一个队列,即死信队列。可以监听这个死信队列中的消息以进行相应的处理。

@Bean("normalQueue")
public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE_NAME).deadLetterRoutingKey("dlx").build();
}
2.2.3 制造死信产生的条件
@Bean("normalQueue")
public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE_NAME).deadLetterRoutingKey("dlx").ttl(10 * 1000).maxLength(10L).build();
}
2.2.4 发送消息
@RequestMapping("/dlx")
public void dlx() {// 测试过期时间,当时间达到 TTL,消息自动进入到死信队列rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");// 测试队列长度// for (int i = 0; i < 20; i++) {//     rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");// }// 测试消息拒收// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
2.2.5 测试死信
  • 程序启动之后,观察队列:队列 Features 说明:D 是 durable 的缩写,表示设置持久化;TTL 表示 Time to Live,队列设置了 TTL;Lim 表示队列设置了长度(x-max-length);DLX 表示队列设置了死信交换机(x-dead-letter-exchange);DLK 表示队列设置了死信 RoutingKeyx-dead-letter-routing-key)。
  • 测试过期时间:调用接口发送消息,10 秒后,消息进入到死信队列。生产者首先发送一条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中。由于队列 normal_queue 设置了过期时间为 10s,在这 10s 内没有消费者消费这条消息,那么判定这条消息过期。由于设置了 DLX,过期之时,消息会被丢给交换器(dlx_exchange)中,这时根据 RoutingKey 匹配,找到匹配的队列(dlx_queue),最后消息被存储在 queue.dlx 这个死信队列中。
  • 测试达到队列长度:队列长度设置为 10,我们发送 20 条数据,会有 10 条数据直接进入到死信队列。发送前,死信队列只有一条数据,发送 20 条消息后,运行后可以看到死信队列变成了 11 条。过期之后,正常队列的 10 条也会进入到死信队列。
  • 测试消息拒收:写消费者代码,并强制异常,测试拒绝签收。

@Component
public class DlxQueueListener {// 指定监听队列的名称@RabbitListener(queues = Constant.NORMAL_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());// 模拟处理失败int num = 3 / 0;System.out.println("处理完成");// 3. 手动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {// 4. 异常了就拒绝签收Thread.sleep(1000);// 第三个参数 requeue,是否重新发送,如果为 true,则会重新发送,若为 false,则直接丢弃,若设置死信,会进入到死信队列channel.basicNack(deliveryTag, true, false);}}// 指定监听队列的名称@RabbitListener(queues = Constant.DLX_QUEUE)public void ListenerDLXQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("死信队列接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());}
}

发送消息,观察运行结果:

接收到消息: dlx test..., deliveryTag: 1
死信队列接收到消息: dlx test..., deliveryTag: 1

2.3 常见问题

  • 死信队列的概念:死信(Dead Letter)是消息队列中的一种特殊消息,它指的是那些无法被正常消费或处理的消息。在消息队列系统中,如 RabbitMQ,死信队列用于存储这些死信消息。
  • 死信的来源
    • 消息过期:消息在队列中存活的时间超过了设定的 TTL。
    • 消息被拒绝:消费者在处理消息时,可能因为消息内容错误、处理逻辑异常等原因拒绝处理该消息。如果拒绝时指定不重新入队(requeue=false),消息也会成为死信。
    • 队列满了:当队列达到最大长度,无法再容纳新的消息时,新来的消息会被处理为死信。
  • 死信队列的应用场景:对于 RabbitMQ 来说,死信队列是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。比如:用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列中,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。其他应用场景还有消息重试、消息丢弃、日志收集等。

3. 延迟队列

3.1 概念

延迟队列(Delayed Queue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

3.2 应用场景

延迟队列的使用场景有很多,比如:

  • 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
  • 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议。
  • 用户注册成功后,7 天后发送短信,提高用户活跃度等。

3.3 TTL + 死信队列实现

RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 TTL + 死信队列的方式组合模拟出延迟队列的功能。

代码实现

声明队列

// 正常队列
@Bean("normalQueue")
public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME); // 绑定死信队列arguments.put("x-dead-letter-routing-key", "dlx"); // 设置发送给死信队列的 RoutingKeyreturn QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}

生产者:发送两条消息,一条消息 10s 后过期,第二条 20s 后过期。

@RequestMapping("/delay")
public String delay() {// 发送带 ttl 的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000"); // 10s 过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000"); // 20s 过期return messagePostProcessor;});return "发送成功!";
}

消费者

// 指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf ("% tc 死信队列接收到消息: % s, deliveryTag: % d% n", new Date (), new String (message.getBody (),"UTF-8"),
message.getMessageProperties ().getDeliveryTag ());
}

**运行程序**:调用接口发送数据`http://127.0.0.1:8080/product/delay`,通过控制台观察死信队列消费情况:
周三 5 月 22 11:59:00 CST 2024 死信队列接收到消息: ttl test 10s...Wed May 22 11:58:50 CST 2024, deliveryTag: 1
周三 5 月 22 11:59:10 CST 2024 死信队列接收到消息: ttl test 20s...Wed May 22 11:58:50 CST 2024, deliveryTag: 2
可以看到,两条消息按照过期时间依次进入了死信队列。延迟队列,就是希望等待特定的时间之后,消费者才能拿到这个消息。TTL刚好可以让消息延迟一段时间成为死信,成为死信的消息会被投递到死信队列里,这样消费者一直消费死信队列里的消息就可以了。**存在问题**:当把生产消息的顺序修改为先发送20s过期数据,再发送10s过期数据时:
```java
@RequestMapping("/delay")
public String delay() {// 发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000"); return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000"); return messagePostProcessor;});return "发送成功!";
}

通过控制台观察死信队列消费情况:

周三 5月 22 12:14:22 CST 2024 死信队列接收到消息: ttl test 20s...Wed May 22 12:14:02 CST 2024, deliveryTag: 3
周三 5月 22 12:14:22 CST 2024 死信队列接收到消息: ttl test 10s...Wed May 22 12:14:02 CST 2024, deliveryTag: 4

这时会发现:10s 过期的消息,也是在 20s 后才进入到死信队列。这是因为 RabbitMQ 只会检查队首消息是否过期,如果过期则丢到死信队列。如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。所以在考虑使用 TTL + 死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

3.4 延迟队列插件

RabbitMQ 官方提供了一个延迟的插件来实现延迟的功能。
安装延迟队列插件

  1. 下载并上传插件:根据自己的 RabbitMQ 版本从插件下载地址选择相应版本的延迟插件,下载后上传到服务器。插件上传目录参考installing Additional Plugins | RabbitMQ,/usr/lib/rabbitmq/plugins是一个附加目录,RabbitMQ 包本身不会在此安装任何内容,如果没有这个路径,可以自己进行创建。如果为 docker 操作,使用docker cp命令复制文件到 docker 容器,例如:docker cp 宿主机文件 容器名称或ID:容器目录
  2. 启动插件:在服务器命令行中,使用rabbitmq - plugins list查看插件列表,使用rabbitmq - plugins enable rabbitmq_delayed_message_exchange启动插件,之后重启 RabbitMQ 服务。如果为 docker 操作,进入容器后同样执行这两个命令来查看和启动插件,最后重启 docker 容器。
  3. 验证插件:在 RabbitMQ 管理平台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了。

基于插件延迟队列实现

声明交换机、队列、绑定关系

import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayedConfig {@Bean("delayedExchange")public Exchange delayedExchange() {return ExchangeBuilder.directExchange(Constant.DELAYED_EXCHANGE_NAME).durable(true).delayed().build();}//2. 队列@Bean("delayedQueue")public Queue delayedQueue() {return QueueBuilder.durable(Constant.DELAYED_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("delayedBinding")public Binding delayedBinding(@Qualifier("delayedExchange") Exchange exchange, @Qualifier("delayedQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delayed").noargs();}
}

生产者:发送两条消息,并设置延迟时间。

@RequestMapping("/delay2")
public String delay2() {// 发送带ttl的消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", "delayed test 20s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(20000L); return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", "delayed test 10s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(10000L); return messagePostProcessor;});return "发送成功!";
}

消费者

import com.bite.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;@Component
public class DelayedQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.DELAYED_QUEUE)public void ListenerDLXQueue(Message message, Channel channel) throws Exception {System.out.printf("%tc 死信队列接收到消息: %s%n", new Date(), new String(message.getBody(),"UTF-8"));}
}

运行程序,并测试:程序启动后,调用接口发送消息http://127.0.0.1:8080/product/delay2,观察控制台:

周三 5月 22 15:42:02 CST 2024 死信队列接收到消息: delayed test 10s...Wed May 22 15:41:52 CST 2024
周三 5月 22 15:42:12 CST 2024 死信队列接收到消息: delayed test 20s...Wed May 22 15:41:52 CST 2024

从结果可以看出,使用延迟队列,可以保证消息按照延迟时间到达消费者。

介绍下 RabbitMQ 的延迟队列:延迟队列是一个特殊的队列,消息发送之后,并不立即给消费者,而是等待特定的时间,才发送给消费者。延迟队列的应用场景有很多,比如订单在十分钟内未支付自动取消、用户注册成功后 3 天后发调查问卷、用户发起退款 24 小时后商家未处理则默认同意自动退款等。但 RabbitMQ 本身并没直接实现延迟队列,通常有两种方法:

  1. TTL + 死信队列组合的方式:优点是灵活不需要额外的插件支持;缺点是存在消息顺序问题,需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性。
  2. 使用官方提供的延迟插件实现延迟功能:优点是通过插件可以直接创建延迟队列,简化延迟消息的实现,避免了 DLX 的时序问题;缺点是需要依赖特定的插件,有运维工作,只适用于特定版本。

4. 事务

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作。RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。

4.1 配置事务管理器

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}

4.2 声明队列

@Bean("transQueue")
public Queue transQueue() {return QueueBuilder.durable("trans_queue").build();
}

4.3 生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/trans")
@RestController
public class TransactionProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactional@RequestMapping("/send")public String send() {rabbitTemplate.convertAndSend("","trans_queue", "trans test 1...");int a = 5/0;rabbitTemplate.convertAndSend("","trans_queue", "trans test 2...");return "发送成功";}
}

4.4 测试

  1. 不加@Transactional,会发现消息 1 发送成功。
  2. 添加@Transactional,消息 1 和消息 2 全部发送失败,体现了事务的原子性。

5. 消息分发

5.1 概念

RabbitMQ 队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表里的一个消费者。默认情况下,RabbitMQ 是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式不太合理,比如某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

我们可以使用channel.basicQos(int prefetchCount)方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量。比如消费端调用了channel.basicQos(5),RabbitMQ 会为该消费者计数,发送一条消息计数 + 1,消费一条消息计数 - 1,当达到了设定的上限,RabbitMQ 就不会再向它发送消息了,直到消费者确认了某条消息,类似 TCP/IP 中的 “滑动窗口”。prefetchCount设置为 0 时表示没有上限,basicQos对拉模式的消费无效。

5.2 应用场景

5.2.1 限流

在订单系统中,正常情况下每秒可处理 5000 请求,但在秒杀时请求瞬间达每秒 1 万个,若全部通过 MQ 发送会压垮订单系统。通过设置prefetchCount参数并将消息应答方式设为手动应答,可实现限流。

配置 prefetch 参数和应答方式

listener:simple:acknowledge-mode: manualprefetch: 5

配置交换机和队列

import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class QosConfig {@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constant.QOS_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
}

发送消息:一次发送 20 条消息。

@RequestMapping("/qos")
public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME, "qos", "qos test..."+i);}return "发送成功!";
}

消费者监听

@Component
public class QosQueueListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);}
}

测试:调用接口发送消息,控制台只打印 5 条消息,管理平台显示待发送 15 条,未确认 5 条。取消prefetch配置,消费者会一次性接收 20 条消息。

5.2.2 负载均衡

在有两个消费者的情况下,若一个消费者处理任务快,一个慢,会导致负载不均衡。通过设置prefetch = 1,可让 RabbitMQ 一次只给一个消费者一条消息,处理并确认前一条消息后再发送新消息,实现负载均衡。

配置 prefetch 参数和应答方式

listener:simple:acknowledge-mode: manualprefetch: 1

启动两个消费者:使用Thread.sleep(100)模拟消费慢。

@Component
public class QosQueueListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void ListenerQosQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);channel.basicAck(deliveryTag, true);}@RabbitListener(queues = Constant.QOS_QUEUE)public void ListenerQueue2(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), deliveryTag);Thread.sleep(100);channel.basicAck(deliveryTag, true);}
}

测试:调用接口发送消息,通过日志观察两个消费者消费消息情况,可看到消息在两个消费者间均衡分配。

感谢阅览。

相关文章:

RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)

RabbitMQ高级特性 RabbitMQ 高级特性解析&#xff1a;RabbitMQ 消息可靠性保障 &#xff08;上&#xff09;-CSDN博客 RabbitMQ 高级特性&#xff1a;从 TTL 到消息分发的全面解析 &#xff08;下&#xff09;-CSDN博客 引言 RabbitMQ 作为一款强大的消息队列中间件&#xff…...

OpenManus-通过源码方式本地运行OpenManus,含踩坑及处理方案

前言&#xff1a;最近 Manus 火得一塌糊涂啊&#xff0c;OpenManus 也一夜之间爆火&#xff0c;那么作为程序员应该来尝尝鲜 1、前期准备 FastGithub&#xff1a;如果有科学上网且能正常访问 github 则不需要下载此软件&#xff0c;此软件是提供国内直接访问 githubGit&#…...

Ubuntu22.04修改root用户并安装cuda

由于本人工作原因&#xff0c;经常会遇到需要给ubuntu打显卡驱动的问题&#xff0c;虽然说不难吧&#xff0c;但是耐不住机器多&#xff0c;重复多次也就烦了&#xff0c;于是抽出了一点时间&#xff0c;并且在deepseek的帮助之下&#xff0c;写了一个自动安装驱动的脚本&#…...

Java LeetCode 热题 100 回顾38

干货分享&#xff0c;感谢您的阅读&#xff01;LeetCode 热题 100 回顾_力code热题100-CSDN博客 一、哈希部分 1.两数之和 &#xff08;简单&#xff09; 题目描述 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两…...

MySQL复习笔记

文章目录 1.MySQL1.1什么是数据库1.2 数据库分类1.3 MySQL简介1.4连接数据库 2. 操作数据库2.1 操作数据库2.2 数据库的列类型2.3 数据库的字段属性&#xff08;重点&#xff09;2.4 创建数据库表&#xff08;重点&#xff09;2.5 数据表的类型2.6 修改数据表 3. MySQL 数据管理…...

解释 TypeScript 中的类型系统,如何定义和使用类型?

1. 类型系统的核心作用 TypeScript类型系统本质上是JavaScript的静态类型增强方案&#xff0c;提供三个核心价值&#xff1a; 开发阶段类型检查&#xff08;类似编译时eslint&#xff09;更清晰的API文档&#xff08;类型即文档&#xff09;更好的IDE自动补全支持 代码示例&…...

安裝do時出現log file support is not available

“log file support is not available (press RETURN)” 这个提示信息表明日志文件支持不可用&#xff0c;让你按回车键继续。出现这种情况可能是因为 Odoo 的日志相关配置存在问题或者一些必要的依赖没有正确安装配置。以下是一些可以尝试的解决办法&#xff1a; 1. 检查 Odo…...

[HTTP协议]应用层协议HTTP从入门到深刻理解并落地部署自己的云服务(1)知识基础

[HTTP协议]应用层协议HTTP从入门到深刻理解并落地部署自己的云服务(1)知识基础 水墨不写bug 文章目录 (一)概念梳理1.什么是协议?2.什么是应用层?3. 为什么要进行分层&#xff1f; &#xff08;二&#xff09;HTTP协议2.1 初识HTTP协议2.2HTTP协议的URL2.2.1域名2.2.2端口号2…...

机票改签请求

示例代码&#xff1a; tool def update_ticket_to_new_flight(ticket_no: str, new_flight_id: int) -> str:"""Update the users ticket to a new valid flight.Args:ticket_no (str): The ticket number to be updated.new_flight_id (int): The ID of th…...

linux下文件读写操作

Linux下&#xff0c;文件I/O是操作系统与文件系统之间进行数据传输的关键部分。文件I/O操作允许程序读取和写入文件&#xff0c;管理文件的打开、关闭、创建和删除等操作。 1. 文件描述符 在Linux中&#xff0c;每个打开的文件都由一个文件描述符来表示。文件描述符是一个非负…...

命名管道的创建和通信实现

目录 命名管道的创建 使用函数创建命名管道的通信 预备创建 makefile设计 server.hpp设计 clent.hpp设计 comm.hpp设计 server.cc设计 clent.cc设计 测试运行 今天我们来学习命名管道 由于匿名管道&#xff08;pipe()&#xff09;无法在两个毫不相干的进程之间进行通…...

C++和OpenGL实现3D游戏编程【连载24】——父物体和子物体之间的坐标转换

欢迎来到zhooyu的C++和OpenGL游戏专栏,专栏连载的所有精彩内容目录详见下边链接: 🔥C++和OpenGL实现3D游戏编程【总览】 父子物体的坐标转换 1、本节要实现的内容 前面章节我们了解了父物体与子物体的结构,它不仅能够表示物体之间的层次关系,更重要的一个作用就是展示物…...

21.HarmonyOS Next CustomSlider组件步长控制教程(三)

温馨提示&#xff1a;本篇博客的详细代码已发布到 git : https://gitcode.com/nutpi/HarmonyosNext 可以下载运行哦&#xff01; 文章目录 1. 步长控制概述2. 步长基本概念2.1 什么是步长&#xff1f;2.2 步长的作用 3. 设置步长3.1 基本参数3.2 代码示例 4. 步长与范围的关系4…...

小白学习:rag向量数据库

学习视频&#xff1a; https://www.bilibili.com/video/BV11zf6YyEnT/?spm_id_from333.337.search-card.all.click 例子&#xff1a; 用户提出问题 客服机器人基于rag回答用户问题 过程拆解&#xff1a; 客户问题 – 转化为向量表示 – 在向量数据库中进行相似性搜索 – 系…...

STM32 CAN模块原理与应用详解

目录 概述 一、CAN模块核心原理 1. CAN协议基础 2. STM32 CAN控制器结构 3. 波特率配置 二、CAN模块配置步骤&#xff08;基于HAL库&#xff09; 1. 初始化CAN外设 2. 配置过滤器 3. 启动CAN通信 三、数据收发实现 1. 发送数据帧 2. 接收数据帧&#xff08;中断方式…...

NO.29十六届蓝桥杯备战|string九道练习|reverse|翻转|回文(C++)

P5015 [NOIP 2018 普及组] 标题统计 - 洛谷 #include <bits/stdc.h> using namespace std;int main() {ios::sync_with_stdio(false);cin.tie(nullptr);string s;getline(cin, s);int sz s.size();int cnt 0;for (int i 0; i < sz; i){if (isspace(s[i]))continue…...

最新版本TOMCAT+IntelliJ IDEA+MAVEN项目创建(JAVAWEB)

前期所需&#xff1a; 1.apache-tomcat-10.1.18-windows-x64&#xff08;tomcat 10.1.8版本或者差不多新的版本都可以&#xff09; 2.IntelliJ idea 24年版本 或更高版本 3.已经配置好MAVEN了&#xff08;一定先配置MAVEN再搞TOMCAT会事半功倍很多&#xff09; 如果有没配置…...

MAC-禁止百度网盘自动升级更新

通过终端禁用更新服务(推荐)​ 此方法直接移除百度网盘的自动更新组件,无需修改系统文件。 ​步骤: ​1.关闭百度网盘后台进程 按下 Command + Space → 输入「活动监视器」→ 搜索 BaiduNetdisk 或 UpdateAgent → 结束相关进程。 ​2.删除自动更新配置文件 打开终端…...

Unity DOTS从入门到精通之EntityCommandBufferSystem

文章目录 前言安装 DOTS 包ECBECB可以执行的指令示例&#xff1a; 前言 DOTS&#xff08;面向数据的技术堆栈&#xff09;是一套由 Unity 提供支持的技术&#xff0c;用于提供高性能游戏开发解决方案&#xff0c;特别适合需要处理大量数据的游戏&#xff0c;例如大型开放世界游…...

【AIGC系列】6:HunyuanVideo视频生成模型部署和代码分析

AIGC系列博文&#xff1a; 【AIGC系列】1&#xff1a;自编码器&#xff08;AutoEncoder, AE&#xff09; 【AIGC系列】2&#xff1a;DALLE 2模型介绍&#xff08;内含扩散模型介绍&#xff09; 【AIGC系列】3&#xff1a;Stable Diffusion模型原理介绍 【AIGC系列】4&#xff1…...

【Linux】使用问题汇总

#1 ssh连接的时候报Key exchange failed 原因&#xff1a;服务端版本高&#xff0c;抛弃了一些不安全的交换密钥算法&#xff0c;且客户端版本比较旧&#xff0c;不支持安全性较高的密钥交换算法。 解决方案&#xff1a; 如果是内网应用&#xff0c;安全要求不这么高&#xf…...

nnUNet V2修改网络——全配置替换MultiResBlock模块

更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 MultiRes Block 是 MultiResUNet 中核心组件之一,旨在解决传统 U-Net 在处理多尺度医学图像时的局…...

Git合并工具在开发中的使用指南

在团队协作开发中&#xff0c;Git 是最常用的版本控制工具&#xff0c;而代码合并&#xff08;Merge&#xff09;是多人协作不可避免的环节。当多个开发者同时修改同一文件的相同区域时&#xff0c;Git 无法自动完成合并&#xff0c;此时需要借助合并工具&#xff08;Merge Too…...

AutoDL平台租借GPU,创建transformers环境,使用VSCode SSH登录

AutoDL平台租借GPU&#xff0c;创建transformers环境&#xff0c;使用VSCode SSH登录 一、AutoDl平台租用GPU 1.注册并登录AutoDl官网&#xff1a;https://www.autodl.com/home 2.选择算力市场&#xff0c;找到需要的GPU&#xff1a; 我这里选择3090显卡 3.这里我们就选择P…...

listen EACCES: permission denied 0.0.0.0:811

具体错误 npm run serve> bige-v0.0.0 serve > viteThe CJS build of Vites Node API is deprecated. See https://vitejs.dev/guide/troubleshooting.html#vite-cjs-node-api-deprecated for more details. error when starting dev server: Error: listen EACCES: per…...

OpenAI API模型ChatGPT各模型功能对比,o1、o1Pro、GPT-4o、GPT-4.5调用次数限制附ChatGPT订阅教程

本文包含OpenAI API模型对比页面以及ChatGPT各模型功能对比表 - 截至2025最新整理数据&#xff1a;包含模型分类及描述&#xff1b;调用次数限制&#xff1b; 包含模型的类型有&#xff1a; Chat 模型&#xff08;如 GPT-4o、GPT-4.5、GPT-4&#xff09;专注于对话&#xff0c…...

六十天前端强化训练之第十五天React组件基础案例:创建函数式组件展示用户信息(第15-21天:前端框架(React))

欢迎来到编程星辰海的博客讲解 我们已经学了14天了&#xff0c;再坚持坚持&#xff0c;马上我们就可以变得更优秀了&#xff0c;加油&#xff0c;我相信大家&#xff0c;接下来的几天&#xff0c;我会给大家更新前端框架&#xff08;React&#xff09;&#xff0c;看完可以给一…...

北大一二三四版全套DeepSeek教学资料

DeepSeek学习资料合集&#xff1a;https://pan.quark.cn/s/bb6ebf0e9b4d DeepSeek实操变现指南&#xff1a;https://pan.quark.cn/s/76328991eaa2 你是否渴望深入探索人工智能的前沿领域&#xff1f;是否在寻找一份能引领你从理论到实践&#xff0c;全面掌握AI核心技术的学习…...

计算机网络:计算机网络的组成和功能

计算机网络的组成&#xff1a; 计算机网络的工作方式&#xff1a; 计算机网络的逻辑功能; 总结&#xff1a; 计算机网络的功能&#xff1a; 1.数据通信 2.资源共享 3.分布式处理:计算机网络的分布式处理是指将计算任务分散到网络中的多个节点&#xff08;计算机或设备&…...

管理网络安全

防火墙在 Linux 系统安全中有哪些重要的作用&#xff1f; 防火墙作为网络安全的第一道防线&#xff0c;能够根据预设的规则&#xff0c;对进出系统的网络流量进行严格筛选。它可以阻止未经授权的外部访问&#xff0c;只允许符合规则的流量进入系统&#xff0c;从而保护系统免受…...

音频进阶学习十九——逆系统(简单进行回声消除)

文章目录 前言一、可逆系统1.定义2.解卷积3.逆系统恢复原始信号过程4.逆系统与原系统的零极点关系 二、使用逆系统去除回声获取原信号的频谱原系统和逆系统幅频响应和相频响应使用逆系统恢复原始信号整体代码如下 总结 前言 在上一篇音频进阶学习十八——幅频响应相同系统、全…...

Redis7系列:设置开机自启

前面的文章讲了Redis和Redis Stack的安装&#xff0c;随着服务器的重启&#xff0c;导致Redis 客户端无法连接。原来的是Redis没有配置开机自启。此文记录一下如何配置开机自启。 1、修改配置文件 前面的Redis和Redis Stack的安装的文章中已经讲了redis.config的配置&#xf…...

word甲烷一键下标

Sub 甲烷下标()甲烷下标 宏Selection.Find.ClearFormattingSelection.Find.Replacement.ClearFormattingWith Selection.Find.Text "CH4".Replacement.Text "CHguoshao4".Forward True.Wrap wdFindContinue.Format False.MatchCase False.MatchWhole…...

SSH 连接中主机密钥验证失败问题的解决方法

问题描述 在尝试通过 SSH 建立连接时&#xff0c;出现以下错误信息&#xff1a; WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY! Someone could be eavesdropping on you right now (man-in-the-middle attack…...

网络安全工具nc(NetCat)

NetCat是一个非常简单的Unix工具&#xff0c;可以读、写TCP或UDP网络连接(network connection)。它被设计成一个可靠的后端(back-end)工具&#xff0c;能被其它的程序程序或脚本直接地或容易地驱动。同时&#xff0c;它又是一个功能丰富的 网络调试和开发工具&#xff0c;因为它…...

探索在生成扩散模型中基于RAG增强生成的实现与未来

概述 像 Stable Diffusion、Flux 这样的生成扩散模型&#xff0c;以及 Hunyuan 等视频模型&#xff0c;都依赖于在单一、资源密集型的训练过程中通过固定数据集获取的知识。任何在训练之后引入的概念——被称为 知识截止——除非通过 微调 或外部适应技术&#xff08;如 低秩适…...

【Linux】37.网络版本计算器

文章目录 1. Log.hpp-日志记录器2. Daemon.hpp-守护进程工具3. Protocol.hpp-通信协议解析器4. ServerCal.hpp-计算器服务处理器5. Socket.hpp-Socket通信封装类6. TcpServer.hpp-TCP服务器框架7. ClientCal.cc-计算器客户端8. ServerCal.cc-计算器服务器9. 代码时序1. 服务器启…...

3.6c语言

#define _CRT_SECURE_NO_WARNINGS #include <math.h> #include <stdio.h> int main() {int sum 0,i,j;for (j 1; j < 1000; j){sum 0;for (i 1; i < j; i){if (j % i 0){sum i;} }if (sum j){printf("%d是完数\n", j);}}return 0; }#de…...

【 IEEE出版 | 快速稳定EI检索 | 往届已EI检索】2025年储能及能源转换国际学术会议(ESEC 2025)

重要信息 主会官网&#xff1a;www.net-lc.net 【论文【】投稿】 会议时间&#xff1a;2025年5月9-11日 会议地点&#xff1a;中国-杭州 截稿时间&#xff1a;见官网 提交检索&#xff1a;IEEE Xplore, EI Compendex, Scopus 主会NET-LC 2025已进入IEEE 会议官方列表!&am…...

JVM常用概念之本地内存跟踪

问题 Java应用启动或者运行过程中报“内存不足&#xff01;”&#xff0c;我们该怎么办? 基础知识 对于一个在本地机器运行的JVM应用而言&#xff0c;需要足够的内存来存储机器代码、堆元数据、类元数据、内存分析等数据结构&#xff0c;来保证JVM应用的成功启动以及未来平…...

JVM 的主要组成部分及其作用?

创作内容丰富的干货文章很费心力&#xff0c;感谢点过此文章的读者&#xff0c;点一个关注鼓励一下作者&#xff0c;激励他分享更多的精彩好文&#xff0c;谢谢大家&#xff01; JVM包含两个子系统和两个组件&#xff0c;两个子系统为Class loader(类装载)、Execution engine(执…...

从能耗监测到碳资产管理:智慧校园能源管理平台的迭代升级与实践启示

一、核心价值提炼 随着我国能源结构转型的持续优化和“双碳”目标的明确&#xff0c;构建现代化能源体系已成为国家发展的重要战略。在这一背景下&#xff0c;校园作为能源消耗的重要领域&#xff0c;其能源管理的智能化、绿色化转型显得尤为重要。本文将深入探讨校园智慧能源…...

数据库核心-redo、undo

一、redo日志 InnoDB操作以页为单位操作数据。并且首先操作内存中缓冲池的数据&#xff0c;然后刷新到disk中&#xff0c;但如果事务提交后宕机、未能刷新到disk中&#xff0c;就会造成不一致情况。 重做日志&#xff1a; 系统重启时按照修改步骤重新更新数据页 redo日志占用…...

Ubuntu 下 nginx-1.24.0 源码分析 - ngx_core_module

定义在 src\core\nginx.c ngx_module_t ngx_core_module {NGX_MODULE_V1,&ngx_core_module_ctx, /* module context */ngx_core_commands, /* module directives */NGX_CORE_MODULE, /* module type */NULL…...

SQLAlchemy系列教程:如何执行原生SQL

Python中的数据库交互提供了高级API。但是&#xff0c;有时您可能需要执行原始SQL以提高效率或利用数据库特定的特性。本指南介绍在SQLAlchemy框架内执行原始SQL。 在SQLAlchemy中执行原生SQL SQLAlchemy虽然以其对象-关系映射&#xff08;ORM&#xff09;功能而闻名&#xff…...

怎么实现: 大语言模型微调案例

怎么实现: 大语言模型微调案例 目录 怎么实现: 大语言模型微调案例输入一个反常识的问题:首都在北京天安门之后对输出模型进行测试:首都在北京天安门微调代码:测试微调模型代码:微调输出模型结构输出模型参数大小对比Qwen 2.5_0.5:53MB输出模型:951MB 是一样的,没有进行…...

【Linux内核系列】:深入理解缓冲区

&#x1f525; 本文专栏&#xff1a;Linux &#x1f338;作者主页&#xff1a;努力努力再努力wz ★★★ 本文前置知识&#xff1a; 文件系统以及相关系统调用接口 输入以及输出重定向 那么在此前的学习中&#xff0c;我们了解了文件的概念以及相关的系统调用接口&#xff0c;并…...

【Qt】成员函数指针

一、成员函数指针的本质 与普通函数指针的区别&#xff1a; // 普通函数指针 void (*funcPtr)() &普通函数;// 成员函数指针 void (MyClass::*memberFuncPtr)() &MyClass::成员函数;• 绑定对象&#xff1a;成员函数指针必须与类的实例对象结合使用 • 隐含 this 指…...

关于 Can Utils 的详细介绍、使用方法及配置指南

Can Utils&#xff1a;开源CAN总线工具集合 一、Can Utils简介 Can Utils 是一组开源的CAN总线工具链&#xff0c;专为嵌入式开发者和网络诊断工程师设计&#xff0c;支持Linux、Windows和macOS系统。它包含一系列轻量级命令行工具&#xff08;如 cantoolz、candump、canbus …...

【Academy】OAuth 2.0 身份验证漏洞 ------ OAuth 2.0 authentication vulnerabilities

OAuth 2.0 身份验证漏洞 ------ OAuth 2.0 authentication vulnerabilities 1. 什么是 OAuth&#xff1f;2. OAuth 2.0 是如何工作的&#xff1f;3. OAuth 授权类型3.1 OAuth 范围3.2 授权代码授权类型3.3 隐式授权类型 4. OAuth 身份验证4.1 识别 OAuth 身份验证4.2 侦察OAuth…...