RabbitMQ复习笔记
文章目录
- `MQ` 概述
- 同步调用
- 拓展性差的问题
- 性能下降的问题
- 级联失败问题
- 异步调用
- 举例
- 技术选型
- `RabbitMQ`
- `RabbitMQ` 安装
- `RabbitMQ` 收发消息
- 交换机
- 队列
- 绑定关系
- 模拟发送消息
- `RabbitMQ` 数据隔离
- 用户管理
- `virtual host` 授权
- `SpringAMOP`
- `SpringAMOP` 快速入门
- 消息发送
- 消息接收
- `Work Queues` 模型
- `WorkQueues` 模型概述
- 消息发送
- 消息接收
- 实现能者多劳
- 总结
- `Exchange` 交换机概述
- `Fanout` 交换机
- `Fanout` 概述
- 声明队列和交换机
- 消息发送
- 消息接收
- 总结
- `Direct` 交换机
- `Direct` 概述
- 声明队列和交换机
- 消息接收
- 消息发送
- 总结
- `Topic` 交互机
- `Topic` 概述
- 声明队列和交换机
- 消息发送
- 消息接收
- 总结
- 代码声明队列和交换机
- 基本 `API` (基于 `Bean`)
- `fanout` 示例(基于 `Bean`)
- `direct` 示例(基于 `Bean`)
- 基于注解声明自动创建
- 消息转换器
- 消息转换器概述
- 测试默认转换器
- 配置 `JSON` 转换器
- 发送者的可靠性
- 消息丢失的可能性
- 生产者重连机制
- 生产者确认机制
- `MQ` 的可靠性
- `MQ` 可靠性概述
- 数据持久化
- 交换机持久化
- 队列持久化
- 消息持久化
- `LazyQueue`
- 控制台配置 `Lazy` 模式
- 代码配置 `Lazy` 模式
- 基于注解声明队列并设置为 `Lazy` 模式
- 更新已有队列为 `Lazy` 模式
- 消费者的可靠性
- 消费者的可靠性概述
- 消费者确认机制
- 失败重试机制
- 失败处理策略
- 业务幂等性
- 幂等性概述
- 唯一消息 `ID` 解决幂等性
- 兜底:延迟消息
- 延迟消息概述
- 死信交换机实现延迟消息
- `DelayExchange` 插件
- 安装插件
- 声明死信交换机
- 发送延迟消息
MQ
概述
同步调用
目前我们采用的是基于
OpenFeign
的同步调用,也就是说业务执行流程是这样的:
- 支付服务需要先调用用户服务完成余额扣减
- 然后支付服务自己要更新支付流水单的状态
- 然后支付服务调用交易服务,更新业务订单状态为已支付
三个步骤依次执行。
这其中就存在
3
个问题
拓展性差的问题
我们目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。
在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。假如后期产品经理提出这样新的需求,你怎么办?是不是要在上述业务中再加入通知用户的业务?
某些电商项目中,还会有积分或金币的概念。假如产品经理提出需求,用户支付成功后,给用户以积分奖励或者返还金币,你怎么办?是不是要在上述业务中再加入积分业务、返还金币业务?
…
最终你的支付业务会越来越臃肿:
也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。
性能下降的问题
由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了。
级联失败问题
由于我们是基于
OpenFeign
调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。这其实就是同步调用的级联失败问题。
但是大家思考一下,我们假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。毕竟收到手里的钱没道理再退回去吧。因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。
综上,同步调用的方式存在下列问题:
- 拓展性差
- 性能下降
- 级联失败
而要解决这些问题,我们就必须用异步调用的方式来代替同步调用。
异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息
Broker
。然后接收者根据自己的需求从消息Broker
那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。这样,发送消息的人和接收消息的人就完全解耦了。
举例
除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到
Broker
。而相关的微服务都可以订阅消息通知,一旦消息到达Broker
,则会分发给每一个订阅了的微服务,处理各自的业务。假如产品经理提出了新的需求,比如要在支付成功后更新用户积分。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可:
不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。
另外,不管是交易服务、通知服务,还是积分服务,他们的业务与支付关联度低。现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。
综上,异步调用的优势包括:
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
当然,异步通信也并非完美无缺,它存在下列缺点:
- 完全依赖于
Broker
的可靠性、安全性和性能- 架构复杂,后期维护和调试麻烦
技术选型
消息Broker,目前常见的实现方案就是消息队列(
MessageQueue
),简称为MQ.目比较常见的MQ实现:
ActiveMQ
RabbitMQ
RocketMQ
Kafka
几种常见MQ
的对比:
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好,
RabbitMQ
RabbitMQ
安装
- 在
Docker
环境run
起来- 记得防火墙开放
15672
和5672
两个端口
- 记得防火墙开放
docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network heima\-d \rabbitmq:3.8-management
- 然后访问
http://192.168.88.130:15672
即可访问
概念图示
publisher
:生产者,也就是发送消息的一方consumer
:消费者,也就是消费消息的一方queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue上述这些东西都可以在
RabbitMQ
的管理控制台来管理
RabbitMQ
收发消息
交换机
- 打开
Exchanges
选项卡,可以看到已经存在很多交换机
- 我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的
publish message
发送一条消息- 这里是由控制台模拟了生产者发送的消息。由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力,所以必须和队列绑定
队列
- 打开
Queues
选项卡,新建一个队列
- 命名为
hello.queue1
- 再以相同的方式,创建一个队列,命名为
hello.queue2
,最终队列列表如下
此时,我们再次向
amq.fanout
交换机发送一条消息。会发现消息依然没有到达队列,因为发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。
绑定关系
- 点击
Exchanges
选项卡,点击amq.fanout
交换机,进入交换机详情页,然后点击Bindings
菜单,在表单中填写要绑定的队列名称
- 相同的方式,将
hello.queue2
也绑定到改交换机,最终,绑定结果如下
模拟发送消息
- 再次回到
exchange
页面,找到刚刚绑定的amq.fanout
,点击进入详情页,再次发送一条消息
- 回到
Queues
页面,可以发现hello.queue
中已经有一条消息了
- 点击队列名称,进入详情页,查看队列详情,这次我们点击
get message
- 可以看到消息到达队列了
这个时候如果有消费者监听了
MQ
的hello.queue1
或hello.queue2
队列,自然就能接收到消息了。
RabbitMQ
数据隔离
用户管理
- 点击
Admin
选项卡,首先会看到RabbitMQ
控制台的用户管理界面
这里的用户都是
RabbitMQ
的管理或运维人员。目前只有安装RabbitMQ
时添加的itheima
这个用户。仔细观察用户表格中的字段,如下:
Name
:itheima
,也就是用户名Tags
:administrator
,说明itheima
用户是超级管理员,拥有所有权限Can access virtual host
:/
,可以访问的virtual host
,这里的/
是默认的virtual host
对于小型企业而言,出于成本考虑,我们通常只会搭建一套
MQ
集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host
的隔离特性,将不同项目隔离。一般会做两件事情:
- 给每个项目创建独立的运维账号,将管理权限分离。
- 给每个项目创建不同的
virtual host
,将每个项目的数据隔离。
- 比如,我们给黑马商城创建一个新的用户,命名为
hmall
- 你会发现此时
hmall
用户没有任何virtual host
的访问权限
virtual host
授权
- 先退出登录
- 切换到刚刚创建的
hmall
用户登录,然后点击Virtual Hosts
菜单,进入virtual host
管理页
- 可以看到目前只有一个默认的
virtual host
,名字为/
。我们可以给黑马商城项目创建一个单独的virtual host
,而不是使用默认的/
。
- 创建完成后如图
- 由于我们是登录
hmall
账户后创建的virtual host
,因此回到users
菜单,你会发现当前用户已经具备了对/hmall
这个virtual host
的访问权限了
- 此时,点击页面右上角的
virtual host
下拉菜单,切换virtual host
为/hmall
- 然后再次查看
queues
选项卡,会发现之前的队列已经看不到了- 这就是基于
virtual host
的隔离效果
- 这就是基于
SpringAMOP
SpringAMOP
快速入门
消息发送
- 导入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 首先配置
MQ
地址,在publisher
服务的application.yml
中添加配置
spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
- 然后然后 在
publisher
服务中编写测试类SpringAmqpTest
,并利用RabbitTemplate
实现消息发送
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
- 打开控制台,可以看到消息已经发送到队列中
消息接收
- 首先配置
MQ
地址,在consumer
服务的application.yml
中添加配置
spring:rabbitmq:host: 192.168.88.131 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
- 然后在
consumer
服务的com.itheima.consumer.listener
包中新建一个类SpringRabbitListener
@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
- 测试:启动
consumer
服务,然后在publisher
服务中运行测试代码,发送MQ
消息。最终consumer
收到消息
Work Queues
模型
WorkQueues
模型概述
Work queues
,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。**当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
消息发送
publisher
发送者模拟大量消息堆积
/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
消息接收
- 在
consumer
消费者中添加两个队列- 注意到这两消费者,都设置了
Thead.sleep
,模拟任务耗时:- 消费者1
sleep
了20
毫秒,相当于每秒钟处理50
个消息 - 消费者2
sleep
了200
毫秒,相当于每秒处理5
个消息
- 消费者1
- 注意到这两消费者,都设置了
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}
- 启动测试程序
会发现虽然设置了
sleep
但是并没有实现能者多劳可以看到消费者1和消费者2竟然每人消费了25条消息:
- 消费者
1
很快完成了自己的25
条消息- 消费者
2
却在缓慢的处理自己的25
条消息。也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致
1
个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1
秒。这样显然是有问题的。
实现能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改
consumer
服务的application.yml
文件,添加perfetch
配置可以发现,由于消费者
1
处理速度较快,所以处理了更多的消息;消费者2
处理速度较慢,只处理了6
条消息。而最终总的执行耗时也在1
秒左右,大大提升。正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
总结
Work Queues
模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置
prefetch
来控制消费者预取的消息数量
Exchange
交换机概述
在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化
可以看到,在订阅模型中,多了一个
exchange
角色,而且过程略有变化:
Publisher
:生产者,不再发送消息到队列中,而是发给交换机Exchange
:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Queue
:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。Consumer
:消费者,与以前一样,订阅队列,没有变化
Exchange
(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange
绑定,或者没有符合路由规则的队列,那么消息会丢失!交换机的类型有四种:
Fanout
:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout
交换机Direct
:订阅,基于RoutingKey
(路由key)发送给订阅了消息的队列Topic
:通配符订阅,与Direct
类似,只不过RoutingKey
可以使用通配符Headers
:头匹配,基于MQ
的消息头匹配,用的较少。
Fanout
交换机
Fanout
概述
Fanout
,英文翻译叫广播,会将接收到的消息传给所有绑定的Queue
在广播模式下
- 可以有多个队列
- 每个队列都要绑定到
Exchange
(交换机)- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
声明队列和交换机
- 在控制台创建队列
fanout.queue1
和fanout.queue2
- 然后创建交换机
- 绑定两个队列到交换机
消息发送
- 通过
publish
生产者把消息发给交换机
@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "hmall.fanout";// 消息String message = "hello, everyone!";//发送消息 routing key 为 null ,因为用不到rabbitTemplate.convertAndSend(exchangeName, "null", message);
}
消息接收
- 通过消费者消费从交换机广播过来的消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
总结
交换机的作用
- 接收
publisher
发送的消息- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
FanoutExchange
的会将消息路由到每个绑定的队列
Direct
交换机
Direct
概述
在
Fanout
模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct
类型的Exchange
。在
Direct
模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key
)- 消息的发送方在 向
Exchange
发送消息时,也必须指定消息的RoutingKey
。Exchange
不再把消息交给每一个绑定的队列,而是根据消息的Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
声明队列和交换机
- 声明两个队列
direct.queue1
和direct.queue2
- 声明一个
direct
类型的交换机,命名为hmall.direct
- **然后
red
和blue
作为key
绑定direct.queue1
到hmall.direct
**
- 同理,使用
red
和yellow
作为key
,绑定direct.queue2
到hmall.direct
消息接收
- 在
consumer
消费者中添加监听代码
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
消息发送
- 在
publish
发送者中添加发送代码
@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
- 由于使用的
red
这个key
,所以两个消费者都收到了消息
- 切换
blue
这个key
@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
- 会发现,只有消费者
1
收到了消息,因为只有1
绑定了blue
总结
描述下Direct
交换机与Fanout
交换机的差异?
Fanout
交换机将消息路由给每一个与之绑定的队列Direct
交换机根据RoutingKey
判断路由给哪个队列- 如果多个队列具有相同的
RoutingKey
,则与Fanout
功能类似
Topic
交互机
Topic
概述
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过
Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如:item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
声明队列和交换机
- 交换机
- 队列
消息发送
- 在
publisher
发送者中添加发送代码
/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "hmall.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消息接收
- 在
consumer
消费中添加监听代码
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
总结
描述下 Direct
交换机与 Topic
交换机的差异?
Topic
交换机接收的消息RoutingKey
必须是多个单词,以.
分割Topic
交换机与队列绑定时的bindingKey
可以指定通配符#
:代表0
个或多个词*
:代表1
个词
代码声明队列和交换机
基本 API
(基于 Bean
)
在之前我们都是基于
RabbitMQ
控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
- 创建队列
SpringAMQP
提供了一个Queue
类,用来创建队列
- 创建交换机
SpringAMQP
还提供了一个Exchange
接口,来表示所有不同类型的交换机
- 创建绑定对象
而在绑定队列和交换机时,则需要使用
BindingBuilder
来创建Binding
对象
- 队列和交换机第二种创建方式
fanout
示例(基于 Bean
)
- 一般在消费者端构建
注意:是在配置类中
并且绑定关系是通过注入的方式绑定
@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange(){//第一种方式//return new FanoutExchange("hmall.fanout");//第二种方式return ExchangeBuilder.fanoutExchange("hmall.fanout").build();}@Beanpublic Queue fanoutQueue1(){//第一种创建队列的方式//QueueBuilder.durable("fanout.queue1").build();//第二种创建队列的方式return new Queue("fanout.queue1");}@Beanpublic Binding fanoutQueue1Binding(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2(){//第一种创建队列的方式//QueueBuilder.durable("fanout.queue1").build();//第二种创建队列的方式return new Queue("fanout.queue2");}@Beanpublic Binding fanoutQueue2Binding(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
direct
示例(基于 Bean
)
direct
模式由于要绑定多个KEY
,会非常麻烦,每一个Key
都要编写一个binding
@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
基于注解声明自动创建
基于
@Bean
的方式声明队列和交换机比较麻烦,Spring
还提供了基于注解方式来声明。
Direct
方式1
会自动创建队列和交换机还有绑定关系
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
消息转换器
消息转换器概述
Spring
的消息发送代码接收的消息体是一个Object
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为
Java
对象。只不过,默认情况下
Spring
采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
测试默认转换器
consumer
服务中声明新配置类,利用@Bean
方式创建队列
@Configuration
public class MessageConfig {@Beanpublic Queue objectQueue() {return new Queue("object.queue");}
}
- 在
publisher
模块的SpringAmqpTest
中新增一个消息发送的代码,发送一个Map
对象
@Test
public void testSendMap() throws InterruptedException {// 准备消息Map<String,Object> msg = new HashMap<>();msg.put("name", "柳岩");msg.put("age", 21);// 发送消息rabbitTemplate.convertAndSend("object.queue", msg);
}
- 检查结果,看到格式很差
配置 JSON
转换器
JDK
序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON
方式来做序列化和反序列化。
publisher
和consumer
两个服务中都引入依赖- 注意,如果项目中引入了
spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
- 注意,如果项目中引入了
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在
publisher
和consumer
两个服务的配置类中添加消息转换器Bean
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
- 检查结果
发送者的可靠性
消息丢失的可能性
消息从生产者到消费者的每一步都可能导致消息丢失:
发送消息时丢失:
- 生产者发送消息时连接
MQ
失败- 生产者发送消息到达
MQ
后未找到Exchange
- 生产者发送消息到达
MQ
的Exchange
后,未找到合适的Queue
- 消息到达
MQ
后,处理消息的进程发生异常
MQ
导致消息丢失:
- 消息到达
MQ
,保存到队列后,尚未消费就突然宕机消费者处理消息时:
消息接收后尚未处理突然宕机
消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证
MQ
的可靠性,就必须从3个方面入手:
- 确保生产者一定把消息发送到
MQ
- 确保
MQ
不会将消息弄丢- 确保消费者一定要处理消息
生产者重连机制
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,
SpringAMQP
提供的消息发送时的重试机制。即:当RabbitTemplate
与MQ
连接超时后,多次重试。修改
publisher
模块的application.yaml
文件,添加下面的内容注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过
SpringAMQP
提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
生产者确认机制
一般情况下,只要生产者与
MQ
之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。不过,在少数情况下,也会出现消息发送到
MQ
之后丢失的现象,比如:
MQ
内部处理消息的进程发生了异常- 生产者发送消息到达
MQ
后未找到Exchange
- 生产者发送消息到达
MQ
的Exchange
后,未找到合适的Queue
,因此无法路由针对上述情况,
RabbitMQ
提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ
后,MQ
会根据消息处理的情况返回不同的回执。总结如下:
- 当消息投递到
MQ
,但是路由失败时,通过Publisher Return
返回异常信息,同时返回ack
的确认信息,代表投递成功- 临时消息投递到了
MQ
,并且入队成功,返回ACK
,告知投递成功- 持久消息投递到了
MQ
,并且入队完成持久化,返回ACK
,告知投递成功- 其它情况都会返回
NACK
,告知投递失败其中
ack
和nack
属于Publisher Confirm
机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return
机制。默认两种机制都是关闭状态,需要通过配置文件来开启。
- 第一步开启生产者确认
在
publisher
模块的application.yaml
中添加配置这里
publisher-confirm-type
有三种模式可选:
none
:关闭confirm
机制simple
:同步阻塞等待MQ的回执correlated
:MQ异步回调返回回执一般我们推荐使用
correlated
,回调机制。
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制
- 定义
ReturnCallback
每个
RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。我们在publisher
模块定义一个配置类
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
- 定义
ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此
ConfirmCallback
需要在每次发消息时定义。具体来说,是在调用RabbitTemplate
中的convertAndSend
方法时,多传递一个参数:
这里的
CorrelationData
中包含两个核心的东西:
id
:消息的唯一标示,MQ
对不同的消息的回执以此做判断,避免混淆SettableListenableFuture
:回执结果的Future
对象将来
MQ
的回执就会通过这个Future
来返回,我们可以提前给CorrelationData
中的Future
添加回调函数来处理消息回执:
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
注意事项
开启生产者确认比较消耗
MQ
性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为
RoutingKey
错误导致,往往是编程导致- 交换机名称错误:同样是编程错误导致
MQ
内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback
处理nack
就可以了。
MQ
的可靠性
MQ
可靠性概述
保证可靠性必须从数据存储的方式上入手
在默认情况下,
RabbitMQ
会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
- 消费者宕机或出现网络故障
- 消息发送量激增,超过了消费者处理速度
- 消费者处理业务发生阻塞
一旦出现消息堆积问题,
RabbitMQ
的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ
会将内存消息刷到磁盘上,这个行为成为PageOut
.PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ
不会再处理新的消息,生产者的所有请求都会被阻塞。
数据持久化
默认情况下 SpringAMOP
交换机,队列,消息默认持久化
消息到达
MQ
以后,如果MQ
不能及时保存,也会导致消息丢失,所以MQ
的可靠性也非常重要。为了提升性能,默认情况下
MQ
的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
- 交换机持久化
- 队列持久化
- 消息持久化
注意:持久化默认是在内存里面再根据消息要不要持久化放进硬盘
交换机持久化
在控制台的
Exchanges
页面,添加交换机时可以配置交换机的Durability
参数设置为
Durable
就是持久化模式,Transient
就是临时模式。
队列持久化
在控制台的
Queues
页面,添加队列时,同样可以配置队列的Durability
参数
消息持久化
在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个
properties
@Testvoid testSendMessage() {// 1.自定义构建消息Message message = MessageBuilder.withBody("Hello, Spring AMQP!".getBytes(StandardCharsets.UTF_8))// 设置消息属性为非持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();for (int i = 0; i < 1000000; i++) {rabbitTemplate.convertAndSend("simple.queues", message);}}
注意
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么
MQ
会在消息持久化以后才发送ACK
回执,进一步确保消息的可靠性。不过出于性能考虑,为了减少
IO
次数,发送到MQ
的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100
毫秒左右,这就会导致ACK
有一定的延迟,因此建议生产者确认全部采用异步方式。
LazyQueue
在默认情况下,
RabbitMQ
会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
- 消费者宕机或出现网络故障
- 消息发送量激增,超过了消费者处理速度
- 消费者处理业务发生阻塞
一旦出现消息堆积问题,
RabbitMQ
的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ
会将内存消息刷到磁盘上,这个行为成为PageOut
.PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ
不会再处理新的消息,生产者的所有请求都会被阻塞。为了解决这个问题,从
RabbitMQ
的3.6.0
版本开始,就增加了Lazy Queues
的模式,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
而在
3.12
版本之后,LazyQueue
已经成为所有队列的默认格式。因此官方推荐升级MQ
为3.12
版本或者所有队列都设置为LazyQueue
模式。
控制台配置 Lazy
模式
在添加队列的时候,添加
x-queue-mod=lazy
参数即可设置队列为Lazy
模式
代码配置 Lazy
模式
在利用
SpringAMQP
声明队列的时候,添加x-queue-mod=lazy
参数也可设置队列为Lazy
模式
@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}
基于注解声明队列并设置为 Lazy
模式
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
更新已有队列为 Lazy
模式
对于已经存在的队列,也可以配置为
lazy
模式,但是要通过设置policy
实现。可以基于命令行设置
policy
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一个策略Lazy
:策略名称,可以自定义"^lazy-queue$"
:用正则表达式匹配队列的名字'{"queue-mode":"lazy"}'
:设置队列模式为lazy模式--apply-to queues
:策略的作用对象,是所有的队列
消费者的可靠性
消费者的可靠性概述
当
RabbitMQ
向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:
- 消息投递的过程中出现了网络故障
- 消费者接收到消息后突然宕机
- 消费者接收到消息后,因处理不当导致异常
- …
一旦发生上述情况,消息也会丢失。因此,
RabbitMQ
必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
消费者确认机制
- 概念
- 配置消费者确认机制
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理
失败重试机制
当消费者出现异常后,消息会不断
requeue
(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue
到队列,再次投递,直到消息处理成功为止。极端情况就是消费者一直无法执行成功,那么消息
requeue
就会无限循环,导致mq
的消息处理飙升,带来不必要的压力当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况
Spring
又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue
到mq
队列。
- 修改
Consumer
服务的application.yaml
重启
consumer
服务,重复之前的测试。可以发现:
- 消费者在失败后消息没有重新回到
MQ
无限重新投递,而是在本地重试了3次- 本地重试
3
次以后,抛出了AmqpRejectAndDontRequeueException
异常。查看RabbitMQ
控制台,发现消息被删除了,说明最后SpringAMQP
返回的是reject
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会
requeue
到队列,而是在消费者本地重试- 重试达到最大次数后,
Spring
会返回reject
,消息会被丢弃
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。因此
Spring
允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机比较优雅的一种处理方案是
RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
RepublishMessageRecoverer
- 在
consumer
服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
- 定义
RepublishMessageRecoverer
,关联交换机和队列
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
- 完整代码
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}//这里形参会自动注入。就不用手动注入 rabbitTemplate了@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
业务幂等性
幂等性概述
幂等是一个数学概念,用函数表达式来描述是这样的:
f(x) = f(f(x))
,例如求绝对值函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
- 根据
id
删除数据- 查询数据
- 新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
- 页面卡顿时频繁刷新导致表单重复提交
- 服务间调用的重试
MQ
消息的重复投递我们在用户支付成功后会发送
MQ
消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。举例:
- 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
- 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
- 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
- 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
- 唯一消息
ID
- 业务状态判断
唯一消息 ID
解决幂等性
这个思路非常简单:
- 每一条消息都生成一个唯一的
id
,与消息一起投递给消费者。- 消费者接收到消息后处理自己的业务,业务处理成功后将消息
ID
保存到数据库- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
- 给消息开启唯一
ID
SpringAMQP
的MessageConverter
自带了MessageID
的功能,我们只要开启这个功能即可。
//转换器原理就是转成 Message 类型对象
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
//用 Message 可以收到 id@RabbitListener(queues = "simple.queues")public void listensSimpleQueue(Message message) {log.info("消息者 ID {}", message.getMessageProperties().getMessageId());log.info("消费者接收到消息:{}", new String(message.getBody()));}
兜底:延迟消息
延迟消息概述
在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。
但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
例如,订单支付超时时间为
30
分钟,则我们应该在用户下单后的第30
分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。但问题来了:如何才能准确的实现在下单后第
30
分钟去检查支付状态呢?像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用
MQ
的延迟消息了。在
RabbitMQ
中实现延迟消息也有两种方案:
- 死信交换机+TTL
- 延迟消息插件
死信交换机实现延迟消息
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过
dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange
)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
- 创建死信交换机
- 普通的死信的的
routingkey
要保持一致
- 普通的死信的的
// exchange 后面 delayed = true 就是声明为死信交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlx.queue", durable = "true"),exchange = @Exchange(name = "dlx.direct", delayed = "true"),key = {"hi"}))public void listensDlxQueue(String message) {log.info("监听到 dlx.queue 死信交换机的的消息: 【{}】", message);}
- 创建普通队列和交换机
@Configuration
public class NormalConfiguration {@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal.direct");}@Beanpublic Queue normalQueue() {//.deadLetterExchange 指定死信交换机return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();}@Beanpublic Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");}
}
- 实现消息延时发送
- 因为没有消费者所以会到死信交换机
- 注意这里是
setExpiration
@Testvoid testSendDelayMessage() {//MessagePostProcessor 消息转换器设置过期时间rabbitTemplate.convertAndSend("normal.direct", "hi", "hello,", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");return message;}});}
- 监听死信交换机的消息
@Component
class DeadLetterQueueConsumer {@RabbitListener(queues = "dlx.queue")public void handleDeadLetterMessage(String message) {System.out.println("Received dead letter message: " + message);// 这里可以添加具体的业务逻辑来处理死信消息}
}
DelayExchange
插件
直接把普通交换机声明为死信交换机
安装插件
插件下载地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
注意和 MQ
版本要对上
- 基于
Docker
安装,所以需要先查看RabbitMQ
的插件目录对应的数据卷
docker volume inspect mq-plugins
//插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
[{"CreatedAt": "2024-06-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]
- 切换到该挂载目录,把插件拖入文件夹
- 执行命令,安装插件
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明死信交换机
- 基于注解方式
//delayed = true 声明私信交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
- 基于
Bean
方式
@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
发送延迟消息
- 发送消息时,必须通过
x-delay
属性设定延迟时间- 然后我们正常监听死信交换机接收消息就行
- 这里和上面不一样这里是
setDelay
上面是setExpiration
@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return mes
相关文章:
RabbitMQ复习笔记
文章目录 MQ 概述同步调用拓展性差的问题性能下降的问题级联失败问题 异步调用举例 技术选型 RabbitMQRabbitMQ 安装RabbitMQ 收发消息交换机队列绑定关系模拟发送消息 RabbitMQ 数据隔离用户管理virtual host 授权 SpringAMOPSpringAMOP 快速入门消息发送消息接收 Work Queues…...
游戏开发核心技术解析——从引擎架构到攻防体系的完整技能树
游戏开发必备的7大技术体系,涵盖从Unity/Unreal引擎应用、C/C#编程范式到图形渲染管线构建等核心技术,特别剖析MMO游戏开发中的网络安全架构设计要点。通过2023年某头部游戏公司DDoS攻击事件,揭示实时防御策略与合规审计的关键作用。一、游戏…...
Execl 最佳字体和大小推荐[特殊字符]
文章目录 ✅ **通用推荐字体与字号**🔤 **字体说明**📊 场景推荐📁 办公文档(如财务报表、周报等)📈 数据可视表格📋 打印友好 🌐 多语言场景(中英文混排) ✅…...
JavaScript学习教程,从入门到精通,Ajax与Node.js Web服务器开发全面指南(24)
Ajax与Node.js Web服务器开发全面指南 一、初识Ajax 1.1 Ajax基本概念 语法知识点: Ajax (Asynchronous JavaScript and XML) 是一种无需重新加载整个网页的情况下,能够更新部分网页的技术核心对象:XMLHttpRequest工作原理: 创…...
VR 全景看车的独特优势
全方位沉浸式体验 VR 全景看车最显著的优势,就是为用户带来了全方位的沉浸式体验。通过 VR 技术,用户仿佛置身于真实的汽车展厅或试驾场景之中,能够 360 度无死角地观察车辆的外观、内饰、细节等各个方面 。无论是车辆的整体造型࿰…...
Kotlin高阶函数 vs Lambda表达式:关键区别与协作关系
先说结论: ✅ 高阶函数既可以用 Lambda 表达式,也可以用函数引用! 在 Kotlin 中,高阶函数(Higher-Order Function)和 Lambda 表达式密切相关,但它们是两个不同的概念: ✅ 简单理解…...
SQL技术终极指南:从内核原理到超大规模应用
一、DDL核心应用场景与最佳实践 1.1 表结构设计场景矩阵 业务场景核心语法要素典型实现案例电商用户画像JSON字段虚拟列索引CREATE TABLE users (id INT, profile JSON, AS (profile->>$.age) VIRTUAL, INDEX idx_age((profile->>$.age)))物联网时序数据分区表压…...
Qt实现语言切换的完整方案
在Qt中实现语言动态切换需要以下几个关键步骤,我将提供一个完整的实现方案: 一、准备工作 在代码中使用tr()标记所有需要翻译的字符串 cpp button->setText(tr("Submit")); 创建翻译文件 在.pro文件中添加: qmake TRANSLATION…...
消息中间件RabbitMQ02:账号的注册、点对点推送信息
一、默认用户登录和账号注册 1.登录 安装好了RMQ之后,我们可以访问如下地址: RabbitMQ Management 输入默认的管理员密码,4.1.0的管理员账号和密码是: guest guest 2.添加账号 consumer consumer 添加成功后: 角色…...
php 支付宝官方 Alipay Easy SDK
使用 Alipay Easy SDK。 打造最好用的支付宝开放平台服务端SDK,Alipay Easy SDK让您享受极简编程体验,快速访问支付宝开放平台开放的各项核心能力。 要求: PHP版本 > 7.0安装PHP cURL扩展安装PHP OpenSSL扩展安装PHP fileinfo扩展 使用…...
深入理解 java synchronized 关键字
🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,…...
即时角色:使用可扩展的扩散变换器框架个性化任何角色
Paper Title: InstantCharacter: Personalize Any Characters with a Scalable Diffusion Transformer Framework 论文发布于2025年4月16日 Abstract部分 U-Net架构的局限性:传统的基于U-Net架构的定制方法存在一些问题,如泛化能力不足和生成图像质量的损失。 U-Net模型需要…...
开源作业调度框架Quartz框架详细使用说明
Quartz框架详细使用说明 Quartz 是一个功能强大的开源作业调度框架,广泛用于在Java应用程序中执行定时任务。以下是Quartz框架的详细使用说明、完整代码示例、同类框架对比以及总结表格。 1. Quartz框架概述 特点: 灵活的调度:支持多种调度方…...
配置Spark历史服务器,轻松查看任务记录
在大数据处理中,Spark是一个强大的分布式计算框架。但当Spark服务重启后,之前的运行记录就会消失,给我们排查问题和分析任务执行情况带来不便。这时,配置Spark历史服务器就显得尤为重要,它能帮助我们保存和查看历史任务…...
身份证实名认证:通往数字安全与便捷生活的钥匙
在数字化日益深入我们生活的今天,信息安全和隐私保护成为了每个人关心的焦点。而身份证实名认证作为保障个人信息安全的重要环节,正扮演着越来越关键的角色。它不仅是连接现实世界与数字世界的桥梁,更是确保个人在线活动安全、可靠的基础。 什…...
0基础可以考MySQL OCP么?备考时间需要多久?
最近被问爆的 “0 基础能不能考 MySQL OCP”“备考要多久” 终于来答疑啦!作为过来人,负责任地说:0 基础完全能冲! 0 基础真的能考 MySQL OCP? 很多姐妹担心自己是数据库小白,连 SQL 都没摸过,…...
node.js 实战——(概念以及Buffer 知识点学习)
概念 node.js是一个开源的、跨平台的javascript运行环境;它可以开发服务器应用,可以开发工具类应用(webpack、vite、Babel),也可以开发桌面端应用(vscode、Figma、Postman) #mermaid-svg-0TkAt8LEFhyrVrsw {font-fami…...
论文阅读 | 大模型工具调用控制的策略优化
文章目录 I. 背景II. 方法细节2.1 问题定义2.2 工具集成RL2.3 PPO2.4 GRPO2.5 OTC-PO2.5.1 OTC-PPO2.5.2 OTC-GRPO2.5.3 工具集成奖励设计 III. 实验 题目: OTC: Optimal Tool Calls via Reinforcement Learning 论文地址: OTC: Optimal Tool Calls via…...
【论文阅读】Dual-branch Cross-Patch Attention Learning for Group Affect Recognition
【论文阅读】Dual-branch Cross-Patch Attention Learning for Group Affect Recognition 摘要1.介绍2.相关工作3.双分支交叉Patch注意力Transformer4.实验5.局限性 摘要 本篇博客参考IEEE 2022年收录的论文Dual-branch Cross-Patch Attention Learning for Group Affect Reco…...
centos stream 10 修改 metric
1. 查看当前网络连接 nmcli connection show 2. 查看当前网络连接的metric ip route show 3. 修改指定连接的 metric sudo nmcli connection modify "Wired connection 1" ipv4.route-metric 100 ipv6.route-metric 100 值越大,优先级越低ÿ…...
Java从入门到“放弃”(精通)之旅——String类⑩
Java从入门到“放弃”(精通)之旅🚀——String类⑩ 前言 在Java编程中,String类是最常用也是最重要的类之一。无论是日常开发还是面试,对String类的深入理解都是必不可少的。 1. String类的重要性 在C语言中…...
MVCWebAPI使用FromBody接受对象的方法
近期在做软件升级操作的时候突然想着需要的参数比较多,如果需要参数的话参数比较多,所有想着使用frombody来集合数据统一操作做了个样张希望对您有帮助 代码如下: /// <summary>/// 入口当前文件接口下的操作数据/// </summary>/…...
知识储备-DC综合相关
DC flow相关 要点描述命令区别DC基础版,使用wireloadmodelcompile_ultraDCT 含DC所有非冲突feature(如wlm),按照DC-prefloorplan-DCT流程获取布局信息更精确评估时序收敛 dc_shell -topoDCG含DCT所有feature,多了layer和congestion信息等 dc_…...
力扣-第645题《错误的集合》
一 . 问题描述 集合 s 包含从 1 到 n 的整数。不幸的是,因为数据错误,导致集合里面某一个数字复制了成了集合里面的另外一个数字的值,导致集合 丢失了一个数字 并且 有一个数字重复 。 给定一个数组 nums 代表了集合 S 发生错误后的结果。 …...
gem5教程第六章 为ARM扩展gem5 这也是改进gem5的一个基础
本章假设您已经使用gem5构建了一个基本的x86系统,并创建了一个简单的配置脚本。 下载ARM二进制文件 让我们从下载一些ARM基准测试二进制文件开始。从gem5文件夹的根目录开始: mkdir -p cpu_tests/benchmarks/bin/arm cd cpu_tests/benchmarks/bin/arm wget dist.gem5.org/…...
事关数据安全,ARM被爆不可修复漏洞
日前,ARM架构再次被爆出重大安全漏洞,影响波及ARMv8.3架构及以前的所有CPU。该漏洞为硬件级且无法完全修复,如被利用可能造成严重数据泄露风险。 ARM硬件级安全漏洞 来自ARM开发者官网的一项安全更新证实,存在一种名为PACMAN的新型…...
Unity中使用Cinemachine插件创建自由视角相机(freelookCamera)来实现第三人称漫游
1.安装下载Cinemachine插件 2.创建自由相机freelookCamera Follow:为我们的人物 LookAt:相机始终看向的地方,可以新建空物体,放在人物头上, invert:是反向,就是时鼠标移动方向与相机旋转方向一致 在组件最下面的…...
Python爬虫(2)Python爬虫入门:从HTTP协议解析到豆瓣电影数据抓取实战
目录 一、背景与需求二、 Web基础与HTTP协议核心解析2.1 HTTP协议:数据交互的基石2.2 为何爬虫需理解HTTP协议? 三、 Python爬虫实战:Requests库核心用法3.1 安装与环境配置3.2 案例1:GET请求抓取豆瓣电影Top2503.3…...
php基础
文章目录 基本语法基本数据类型:运算符?? 空合并 定义变量字符串操作内置变量\$_SESSION:会话信息\$_GET:获取URL参数 内置函数功能工具类的,utils网络通信类的会话管理类的 基本语法 每一个statement后面以;结尾,与C/C和Java一样注释用//,…...
蓝桥杯17. 机器人塔
机器人塔 原题目链接 题目描述 X 星球的机器人拉拉队有两种服装:A 和 B。 这次他们表演的是“搭机器人塔”,类似下图: AB BA B AA A B BB B B A BA B A B B A组塔规则: A 只能站在 AA 或 BB 的肩上;B 只能站在 AB…...
rpm包管理
1.介绍 rpm用于互联网下包的打包及安装工具,包含在某些Linux发布版中,它生成具有.RPM扩展名的文件.RPM是RedHat Package Manage (RedHat软件包管理公具)的编写 类似windows的setup.exe,这一文件格式名称虽然打上RedHat的标志,但理念是通用的. Linux的…...
es 自动补全
安装拼音分词器 选择es版本对应的pinyin分词器版本 下载后解压,放到es的插件目录下 重启es 自定义分词器 拼音分词器——可选配置 1. 首字母处理配置 keep_first_letter (默认: true) 解释:是否提取每个汉字的首字母组合,用于支持首字母…...
NLTK 文本分类与情感分析
在自然语言处理(NLP)的领域中,文本分类和情感分析是两个非常重要且具有广泛应用的任务。文本分类旨在将文本数据分配到预定义的类别中,而情感分析则专注于确定文本所表达的情感倾向,如积极、消极或中性。Python 的 Nat…...
Android开发常用外部组件及使用指南(上)
文章目录 一、前言二、外部组件的引入方式1. Gradle依赖管理1.1 项目级build.gradle1.2 模块级build.gradle 2. 本地库引入3. 模块化引入 三、网络请求组件1. Retrofit1.1 引入依赖1.2 基本使用1.3 高级特性 2. OkHttp2.1 基本使用2.2 拦截器 四、图片加载组件1. Glide1.1 基本…...
系统架构师2025年论文《系统架构风格》
论企软件架构风格 摘要: 我所在单位是某市主要医院,2017 年 1 月医院决定开发全新一代某市医院预约挂号系统,我担任本次系统的架构师,主要负责整个系统的架构设计工作。该系统旨在优化医院挂号流程,提高患者就医体验,是医院应对医疗信息化变革和提升服务质量的重要举措…...
Linux部署Redis主从
Linux部署Redis主从 1.下载2.安装2.1编译 & 安装 3.修改配置文件4.启动 1.下载 在Redis版本库:https://download.redis.io/releases/ 可根据自己的需求选择下载对应的版本,然后直接下载 2.安装 通过Xftp工具进行上传,选择指定的应用拖…...
【Python 学习笔记】 pip指令使用
系列文章目录 pip指令使用 文章目录 系列文章目录前言安装配置使用pip 管理Python包修改pip下载源 前言 提示:这里可以添加本文要记录的大概内容: 当前文章记录的是我在学习过程的一些笔记和思考,可能存在有误解的地方,仅供大家…...
Django DRF实现用户数据权限控制
在 Django DRF 中使用 ModelViewSet 时,若需实现用户仅能查看和操作自己的数据详情,同时允许所有认证用户访问列表,需结合权限类和动态权限分配。以下是具体步骤: 1. 自定义对象权限类 创建一个 IsOwner 权限类,检查…...
eplan许可证与防火墙安全软件冲突
在使用EPLAN电气设计软件时,有时会遇到许可证与防火墙或安全软件之间的冲突。这种冲突可能导致许可证无法激活或软件无法正常运行,给用户带来诸多不便。本文将为您解析EPLAN许可证与防火墙/安全软件冲突的原因,并提供解决方案,帮助…...
《多Agent架构VS千万字长文本VS深度推理引擎——拆解Coze、通义、Kimi的AI终局博弈密码》
Coze、通义和Kimi终局竞争深度分析 技术路线分野:多Agent协同 vs. 超长文本 vs. 结构化提示 架构差异:三者在技术路线上的侧重点各异,塑造了不同的市场边界。Coze(“扣子”)采用多Agent协同架构,强调插件工…...
《浔川代码编辑器v2.1.0预告》
《浔川代码编辑器v2.1.0预告》 尊敬的浔川代码编辑器用户: 我们很高兴向大家预告即将到来的v2.1.0版本更新计划。以下是各版本的发布时间安排: 版本发布计划 1. **v2.1.0公测版** - 发布时间:待v2.0测试版结束后两周 - 特点:包…...
Python jsonpath库终极指南:json数据挖掘的精准导航仪
Python jsonpath库终极指南:json数据挖掘的精准导航仪 对话实录 小白:(抓狂)这个 JSON 数据有好多层嵌套,怎么快速拿到最里面的值? 专家:(递上探测器)用jsonpath库&…...
QT软件安装(12)
文章目录 一、本章说明二、QT软件包资源三、软件安装教程 一、本章说明 注:本节为【基于STM的环境监测系统(节点云服务器存储QT界面设计)】项目第12篇文章,前面文章已经实现了气体传感数据的采集,并通过4G模块上传至云…...
【人工智能】DeepSeek 的开源生态:释放 AI 潜能的社区协同与技术突破
《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 DeepSeek 作为中国 AI 领域的先锋,以其高效的混合专家模型(MoE)和彻底的开源策略,在全球 AI 社区掀起波澜。本文深入剖析 DeepSeek 的开…...
如何选择 Flask 和 Spring Boot
目录 一、选择 Flask 和 Spring Boot 的关键因素如何评价系统的性能1.RPSRPS 的重要性RPS 的评估标准RPS 的计算方法RPS 与并发用户数的关系性能测试中的RPS 2.TPSTPS 的定义TPS 的重要性TPS 与 RPS 的区别TPS 的常见范围计算 TPS 的公式如何提高 TPS 二、后期扩展优化方案Flas…...
在KEIL里C51和MDK兼容以及添加ARM compiler5 version编译器
前言 我们想在一个keil里面可以打开32和51的文件,这样就不需要两个keil了 还有就是现在的keil,比如我用的是5.41的,就没有5版本的处理器,所以要安装 本篇文章我们来详细讲解如何实现上面说的两个内容 准备的东西 1.ARM5编译器 …...
【源码分析】Linux内核ov13850.c
这里写自定义目录标题 1、入口函数:__init sensor_mod_init2、probe函数:ov13850_probe2.1、初始化前的一些准备2.2、设备初始化流程 源码如下 了解运行流程 1、入口函数:__init sensor_mod_init 驱动由 __init 开始 __exit 结束,…...
单片机与FPGA的核心差异、优缺点、编程差异、典型应用场景、选型等对比分析
1. 基本概念差异 单片机(MCU): 基于冯诺依曼/哈佛架构的微控制器,集成CPU、内存、外设接口(如ADC、UART、PWM等),通过软件指令顺序执行任务。 FPGA: 由可编程逻辑单元(…...
PCB规则
PCB封装 原理图绘制完成需要检查 DRC 菜单栏——>设计——>检查 DRC 底部侧边栏——>DRC——>检查 DRC 常见问题: 1)某个导线/网络标签是一个单网络 网络标签名称不一样 网络标签只有一个 引脚没有使用,但是放置了导线 2…...
静态存储区(Static Storage Area)的总结
普通的全局变量未初始化,编译阶段放在com段,链接完后放在bss段 在32位系统中,内核空间为1GB(地址范围:0xC0000000-0xFFFFFFFF),用户空间为3GB 高端内存(HIGHMEM)是32位…...