当前位置: 首页 > news >正文

06-RabbitMQ基础

目录

1.初识MQ

1.1.同步调用

1.2.异步调用

1.3.技术选型

2.RabbitMQ

2.1.安装

2.2.收发消息

2.2.1.交换机

2.2.2.队列

2.2.3.绑定关系

2.2.4.发送消息

2.3.数据隔离

2.3.1.用户管理

2.3.2.virtual host

3.SpringAMQP

3.1.导入Demo工程

3.2.快速入门

3.2.1.消息发送

3.2.2.消息接收

3.2.3.测试

3.3.WorkQueues模型

3.3.1.消息发送

3.3.2.消息接收

3.3.3.测试

3.3.4.能者多劳

3.3.5.总结

3.4.交换机类型

3.5.Fanout交换机

3.5.1.声明队列和交换机

3.5.2.消息发送

3.5.3.消息接收

3.6.Direct交换机

3.6.1.声明队列和交换机

3.6.2.消息接收

3.6.3.消息发送

3.6.4.总结

3.7.Topic交换机

3.7.1.说明

3.7.2.消息发送

3.7.3.消息接收

3.7.4.总结

3.8.Java代码声明交换机和队列

3.8.1.基本API

3.8.2.fanout示例

3.8.2.direct示例

3.8.4.基于注解声明

3.9.消息转换器

3.9.1.测试默认转换器

3.9.2.配置JSON转换器

3.9.3.消费者接收Object

4.业务改造

4.1.配置MQ

4.1.接收消息

4.2.发送消息


微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间的调用采用的是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务后返回结果,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用也可以叫同步通。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?

我们先来看看什么是同步通讯和异步通讯。如图:

解读:

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。

  • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

同步调用的方式我们已经学过了,之前的OpenFeign调用就是。但是:

  • 异步调用又该如何实现?

  • 哪些业务适合用异步调用来实现呢?

1.初识MQ

1.1.同步调用

基于OpenFeign的调用都属于是同步调用,那么这种方式存在哪些问题呢?

举个例子,以昨天余额支付功能为例来分析,首先看下整个流程:

目前采用的是基于OpenFeign的同步调用,也就是业务的执行流程是这样的:

  • 支付服务需要先调用用户服务扣减余额

  • 然后支付服务自己要更新支付流水单的状态

  • 然后支付服务调用交易服务,更新业务订单状态为已支付

三个步骤依次执行。

这其中就存在3个问题:

第一拓展性差

我们目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。

在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。假如后期产品经理提出这样新的需求,你怎么办?是不是要在上述业务中再加入通知用户的业务?

某些电商项目中,还会有积分或金币的概念。假如产品经理提出需求,用户支付成功后,给用户以积分奖励或者返还金币,你怎么办?是不是要在上述业务中再加入积分业务、返还金币业务?

。。。

最终你的支付业务会越来越臃肿:

也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

第二性能下降

由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:

假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了。

第三,级联失败

由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

这其实就是同步调用的级联失败问题。

但是大家思考一下,我们假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。毕竟收到手里的钱没道理再退回去吧。

因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。

综上,同步调用的方式存在下列问题:

  • 拓展性差

  • 性能下降

  • 级联失败

而要解决这些问题,我们就必须用异步调用的方式来代替同步调用

1.2.异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息到消息Broker,然后接收者根据自己的需求从消息Broker中订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

还是以余额支付业务为例:

除了扣减余额、更新支付流水单状态以外,其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。

假如产品经理提出了新的需求,比如要在支付成功后更新用户积分。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可:

不管后期增加了多少消息订阅者,作为支付服务来讲,执行完扣减余额、更新支付流水状态后,发送消息即可。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。

另外,不管是交易服务、通知服务,还是积分服务,他们的业务与支付关联度低。现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。

综上,异步调用的优势包括:

  • 耦合度更低

  • 性能更好

  • 业务拓展性强

  • 故障隔离,避免级联失败

当然,异步通信也并非完美无缺,它存在下列缺点:

  • 完全依赖于Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

1.3.技术选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

目比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好,因此我们课堂上选择RabbitMQ来学习。

2.RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:

https://www.rabbitmq.com/

接下来,我们就学习它的基本概念和基础用法。

2.1.安装

同样基于Docker来安装RabbitMQ,使用下面的命令即可:

docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ中管理控制台的端口

  • 5672:RabbitMQ中消息发送处理的端口

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

登录后即可看到管理控制台总览页面:

RabbitMQ对应的架构如图:

其中包含几个概念:

  • publisher:生产者,发送消息的一方

  • consumer:消费者,消费消息的一方

  • queue:队列,存储消息。生产者发送的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息的路由。生产者发送的消息由交换机决定投递到哪个队列。

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理,下一节我们就一起来学习控制台的使用。

2.2.收发消息

2.2.1.交换机

注意:

  • 交换机只能路由消息,而无法存储消息
  • 队列必须与交换机绑定,因为交换机只会路由消息给与其绑定的队列

我们打开Exchanges选项卡,可以看到已经存在很多交换机:

我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息:

这里通过控制台模拟生产者发送消息。由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力。

2.2.2.队列

我们打开Queues选项卡,新建一个队列:

命名为hello.queue1

再以相同的方式,创建一个队列,密码为hello.queue2,最终队列列表如下:

此时,我们再次向amq.fanout交换机发送一条消息。会发现消息依然没有到达队列!!

怎么回事呢?

发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。

2.2.3.绑定关系

点击Exchanges选项卡,点击amq.fanout交换机,进入交换机详情页,然后点击Bindings菜单,在表单中填写要绑定的队列名称:

相同的方式,将hello.queue2也绑定到改交换机。

最终,绑定结果如下:

2.2.4.发送消息

再次回到exchange页面,找到刚刚绑定的amq.fanout,点击进入详情页,再次发送一条消息:

回到Queues页面,可以发现hello.queue中已经有一条消息了:

点击队列名称,进入详情页,查看队列详情,这次我们点击get message:

可以看到消息到达队列了:

这个时候如果有消费者监听了MQ的hello.queue1hello.queue2队列,自然就能接收到消息了。

2.3.数据隔离

不同项目/业务创建不同用户,并分配不同的虚拟主机;各个虚拟主机之间是数据隔离的

2.3.1.用户管理

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名

  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限

  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。

  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

比如,我们给黑马商城创建一个新的用户,命名为hmall

你会发现此时hmall用户没有任何virtual host的访问权限:

2.3.2.virtual host

我们先退出登录:

切换到刚刚创建的hmall用户登录,然后点击Virtual Hosts菜单,进入virtual host管理页:

可以看到目前只有一个默认的virtual host,名字为 /

我们可以给黑马商城项目创建一个单独的virtual host,而不是使用默认的/

创建完成后如图:

由于我们是登录hmall账户后创建的virtual host,因此回到users菜单,你会发现当前用户已经具备了对/hmall这个virtual host的访问权限了(用哪个用户创建的虚拟主机,哪个用户就对那个虚拟主机有访问权限):

此时,点击页面右上角的virtual host下拉菜单,切换virtual host/hmall

然后再次查看queues选项卡,会发现之前的队列已经看不到了:

这就是基于virtual host 的隔离效果。

3.SpringAMQP

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用,Spring的官方基于RabbitMQ提供了一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:

Docsicon-default.png?t=O83Ahttps://b11et3un53m.feishu.cn/wiki/OQH4weMbcimUSLkIzD6cCpN0nvc#share-JoqFdiNIbolTRVxduGBcCWC7nuf

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

这一章我们就一起学习一下,如何利用SpringAMQP实现对RabbitMQ的消息收发。

3.1.导入Demo工程

在课前资料给大家提供了一个Demo工程,方便我们学习SpringAMQP的使用:

将其复制到你的工作空间,然后用Idea打开,项目结构如图:

包括三部分:

  • mq-demo:父工程,管理项目依赖

  • publisher:消息的发送者

  • consumer:消息的消费者

  • 其中,publisher和consumer都是微服务,publisher用于发送消息,consumer用于接收消息,从而演示跨微服务的消息收发

在mq-demo这个父工程中,已经配置好了SpringAMQP相关的依赖,因此,子工程中可以直接使用SpringAMQP:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

3.2.快速入门

SpringAMQP收发消息的流程总结:

1、引入spring-boot-starter-amqp依赖

2、配置rabbitmq服务端信息

3、利用RabbitTemplate发送消息

4、利用@RabbitListener注解声明要监听的队列,监听消息

在之前的案例中,都是经过交换机发送消息到队列,不过为了测试方便,我们可以直接向队列发送消息,跳过交换机。

在入门案例中,我们就演示这样的简单模型,如图:

也就是:

  • publisher直接发送消息到队列

  • 消费者监听并处理队列中的消息

注意:这种模式一般测试使用,很少在生产中使用。

为了方便测试,我们现在控制台新建一个队列:simple.queue

添加成功:

接下来,我们就可以利用Java代码收发消息了。

3.2.1.消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息:转换消息并将消息发送到队列//queueName是队列名,要发送到哪个队列//message是消息,要发送的消息rabbitTemplate.convertAndSend(queueName, message);  }
}

打开控制台,可以看到消息已经发送到队列中:

接下来,我们再来实现消息接收。

3.2.2.消息接收

先在consumer服务的application.yml中配置MQ的地址:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener注解标注在方法上,声明要监听的队列(表示消费者)// 将来一旦监听的此队列中有消息了,MQ就会推送给当前服务,然后调用这个方法,处理消息。// 此方法的形参就是用于接收消息的,消息发的什么类型,这里就用什么类型接收// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

3.2.3.测试

启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息。最终consumer收到消息:

3.3.WorkQueues模型

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时可以使用Work queues模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高

注意:虽然多个消费者绑定到一个队列,但同一条消息只会被一个消费者处理

接下来,我们就来模拟这样的场景。

首先,我们在控制台创建一个新的队列,命名为work.queue

3.3.1.消息发送

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "work.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

3.3.2.消息接收

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

3.3.3.测试

启动ConsumerApplication后,执行publisher服务中编写的发送测试方法testWorkQueue。

最终结果如下:

消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
消费者1接收到消息:【hello, message_2】21:06:00.907454400
消费者1接收到消息:【hello, message_4】21:06:00.953332100
消费者1接收到消息:【hello, message_6】21:06:00.997867300
消费者1接收到消息:【hello, message_8】21:06:01.042178700
消费者2........接收到消息:【hello, message_3】21:06:01.086478800
消费者1接收到消息:【hello, message_10】21:06:01.087476600
消费者1接收到消息:【hello, message_12】21:06:01.132578300
消费者1接收到消息:【hello, message_14】21:06:01.175851200
消费者1接收到消息:【hello, message_16】21:06:01.218533400
消费者1接收到消息:【hello, message_18】21:06:01.261322900
消费者2........接收到消息:【hello, message_5】21:06:01.287003700
消费者1接收到消息:【hello, message_20】21:06:01.304412400
消费者1接收到消息:【hello, message_22】21:06:01.349950100
消费者1接收到消息:【hello, message_24】21:06:01.394533900
消费者1接收到消息:【hello, message_26】21:06:01.439876500
消费者1接收到消息:【hello, message_28】21:06:01.482937800
消费者2........接收到消息:【hello, message_7】21:06:01.488977100
消费者1接收到消息:【hello, message_30】21:06:01.526409300
消费者1接收到消息:【hello, message_32】21:06:01.572148
消费者1接收到消息:【hello, message_34】21:06:01.618264800
消费者1接收到消息:【hello, message_36】21:06:01.660780600
消费者2........接收到消息:【hello, message_9】21:06:01.689189300
消费者1接收到消息:【hello, message_38】21:06:01.705261
消费者1接收到消息:【hello, message_40】21:06:01.746927300
消费者1接收到消息:【hello, message_42】21:06:01.789835
消费者1接收到消息:【hello, message_44】21:06:01.834393100
消费者1接收到消息:【hello, message_46】21:06:01.875312100
消费者2........接收到消息:【hello, message_11】21:06:01.889969500
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_13】21:06:02.090725900
消费者2........接收到消息:【hello, message_15】21:06:02.293060600
消费者2........接收到消息:【hello, message_17】21:06:02.493748
消费者2........接收到消息:【hello, message_19】21:06:02.696635100
消费者2........接收到消息:【hello, message_21】21:06:02.896809700
消费者2........接收到消息:【hello, message_23】21:06:03.099533400
消费者2........接收到消息:【hello, message_25】21:06:03.301446400
消费者2........接收到消息:【hello, message_27】21:06:03.504999100
消费者2........接收到消息:【hello, message_29】21:06:03.705702500
消费者2........接收到消息:【hello, message_31】21:06:03.906601200
消费者2........接收到消息:【hello, message_33】21:06:04.108118500
消费者2........接收到消息:【hello, message_35】21:06:04.308945400
消费者2........接收到消息:【hello, message_37】21:06:04.511547700
消费者2........接收到消息:【hello, message_39】21:06:04.714038400
消费者2........接收到消息:【hello, message_41】21:06:04.916192700
消费者2........接收到消息:【hello, message_43】21:06:05.116286400
消费者2........接收到消息:【hello, message_45】21:06:05.318055100
消费者2........接收到消息:【hello, message_47】21:06:05.520656400
消费者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到消费者1和消费者2每人都消费了25条消息:

  • 消费者1很快完成了自己的25条消息

  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

默认情况下,RabbitMQ采用轮询(你处理一条我处理一条换着来)的方式将消息分配给绑定在队列上的每个消费者

3.3.4.能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

  • 通过设置prefetch来控制消费者预取的消息数量,如果不设置就不管消费者有没有处理完消息都一直给它分配
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:

消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
消费者1接收到消息:【hello, message_2】21:12:51.703625
消费者1接收到消息:【hello, message_3】21:12:51.724330100
消费者1接收到消息:【hello, message_4】21:12:51.746651100
消费者1接收到消息:【hello, message_5】21:12:51.768401400
消费者1接收到消息:【hello, message_6】21:12:51.790511400
消费者1接收到消息:【hello, message_7】21:12:51.812559800
消费者1接收到消息:【hello, message_8】21:12:51.834500600
消费者1接收到消息:【hello, message_9】21:12:51.857438800
消费者1接收到消息:【hello, message_10】21:12:51.880379600
消费者2........接收到消息:【hello, message_11】21:12:51.899327100
消费者1接收到消息:【hello, message_12】21:12:51.922828400
消费者1接收到消息:【hello, message_13】21:12:51.945617400
消费者1接收到消息:【hello, message_14】21:12:51.968942500
消费者1接收到消息:【hello, message_15】21:12:51.992215400
消费者1接收到消息:【hello, message_16】21:12:52.013325600
消费者1接收到消息:【hello, message_17】21:12:52.035687100
消费者1接收到消息:【hello, message_18】21:12:52.058188
消费者1接收到消息:【hello, message_19】21:12:52.081208400
消费者2........接收到消息:【hello, message_20】21:12:52.103406200
消费者1接收到消息:【hello, message_21】21:12:52.123827300
消费者1接收到消息:【hello, message_22】21:12:52.146165100
消费者1接收到消息:【hello, message_23】21:12:52.168828300
消费者1接收到消息:【hello, message_24】21:12:52.191769500
消费者1接收到消息:【hello, message_25】21:12:52.214839100
消费者1接收到消息:【hello, message_26】21:12:52.238998700
消费者1接收到消息:【hello, message_27】21:12:52.259772600
消费者1接收到消息:【hello, message_28】21:12:52.284131800
消费者2........接收到消息:【hello, message_29】21:12:52.306190600
消费者1接收到消息:【hello, message_30】21:12:52.325315800
消费者1接收到消息:【hello, message_31】21:12:52.347012500
消费者1接收到消息:【hello, message_32】21:12:52.368508600
消费者1接收到消息:【hello, message_33】21:12:52.391785100
消费者1接收到消息:【hello, message_34】21:12:52.416383800
消费者1接收到消息:【hello, message_35】21:12:52.439019
消费者1接收到消息:【hello, message_36】21:12:52.461733900
消费者1接收到消息:【hello, message_37】21:12:52.485990
消费者1接收到消息:【hello, message_38】21:12:52.509219900
消费者2........接收到消息:【hello, message_39】21:12:52.523683400
消费者1接收到消息:【hello, message_40】21:12:52.547412100
消费者1接收到消息:【hello, message_41】21:12:52.571191800
消费者1接收到消息:【hello, message_42】21:12:52.593024600
消费者1接收到消息:【hello, message_43】21:12:52.616731800
消费者1接收到消息:【hello, message_44】21:12:52.640317
消费者1接收到消息:【hello, message_45】21:12:52.663111100
消费者1接收到消息:【hello, message_46】21:12:52.686727
消费者1接收到消息:【hello, message_47】21:12:52.709266500
消费者2........接收到消息:【hello, message_48】21:12:52.725884900
消费者1接收到消息:【hello, message_49】21:12:52.746299900

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

3.3.5.总结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量

3.4.交换机类型

在之前的测试案例中,都没有交换机,生产者是直接发送消息到队列。引入交换机后,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再直接发送消息到队列中,而是发给交换机

  • Exchange:交换机,一方面接收生产者发送的消息;另一方面是处理消息,例如将传递给某个特别队列、传递给所有队列、或将消息丢弃。如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不能存储消息,因此如果没有队列与Exchange绑定,或者没有符合路由规则的队列,消息将会丢失!

交换机的类型有四种:

  • Fanout交换机:广播,将收到的消息 传给 所有与该交换机绑定的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct交换机:订阅,会将收到的消息 根据RoutingKey 路由 到具有同样RoutingKey的Queue中

    • 每一个Queue都设置了一个BindingKey(RoutingKey)

    • 发布者发送消息时,需要指定消息的RoutingKey

    • Exchange会将消息路由到BindingKey与RoutingKey一致的队列中

  • Topic交换机:通配符订阅,与Direct一样,只不过RoutingKey可以使用通配符

  • Headers交换机:头匹配,基于MQ的消息头匹配,用的较少。

我们讲解前面的三种交换机模式。

3.5.Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

我们的计划是这样的:

  • 创建一个名为 hmall.fanoutFanout交换机

  • 创建两个队列fanout.queue1fanout.queue2,并绑定到hmall.fanout交换机

3.5.1.声明队列和交换机

在控制台创建队列fanout.queue1:

再创建一个队列fanout.queue2

然后再创建一个交换机:

然后绑定两个队列到交换机:

3.5.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

  • 第二个参数是:路由键,对于 Fanout 交换机,路由键通常为空字符串,因为 Fanout 交换机会将消息广播到所有与其绑定的队列,而忽略路由键。
package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testFanoutExchange() {// 交换机名称String exchangeName = "hmall.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);}
}

3.5.3.消息接收

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}
}

3.6.Direct交换机

在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不是任意绑定了,而要给队列指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

案例需求如图

  1. 声明一个名为hmall.direct的交换机

  2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred

  3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred

  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  5. 在publisher中编写测试方法,向hmall.direct发送消息

3.6.1.声明队列和交换机

首先在控制台声明两个队列direct.queue1direct.queue2,这里不再展示过程:

然后声明一个direct类型的交换机,命名为hmall.direct:

然后使用redblue作为RoutingKey,绑定direct.queue1hmall.direct

同理,使用redyellow作为key,绑定direct.queue2hmall.direct,步骤略,最终结果:

3.6.2.消息接收

在consumer服务的SpringRabbitListener中添加方法:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");}
}

3.6.3.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}
}

由于使用的red这个key,所以两个消费者都收到了消息:

我们再切换为blue这个key:

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

你会发现,只有消费者1收到了消息:

3.6.4.总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.7.Topic交换机

3.7.1.说明

Topic交换机Direct交换机相比,都是根据RoutingKey把消息路由到不同的队列。

只不过Topic交换机可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey的组成:BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配0个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

图示:

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • japan.news 则代表日本新闻

  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:

3.7.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** topicExchange*/@Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "hmall.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);}
}

3.7.3.消息接收

在consumer服务的SpringRabbitListener中添加方法:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");}
}

3.7.4.总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个词

  • *:代表1个词

3.8.Java代码声明交换机和队列

3.8.1.基本API

SpringAMQP提供了一个Queue类,用来创建队列:

SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:

我们可以自己创建队列和交换机,SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

3.8.2.fanout示例

在consumer中创建一个配置类,声明队列和交换机:

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

3.8.2.direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

3.8.4.基于注解声明

基于@Bean方式的声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

  • @RabbitListener注解的bindings属性:指定 哪个队列 绑定到 哪个交换机上,同时指定队列的BindingKey
    • 指定@RabbitListener注解的bindings属性的值为@QueueBinding()注解
      • 在@QueueBinding()注解中又通过其value属性、exchange属性、key属性分别指定队列、交换机、队列的BindingKey
  • value = @Queue(name = "direct.queue1"):队列名

  • exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT):交换机名和交换机的类型

  • key = {"red", "blue"}:队列的BindingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

是不是简单多了。

再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

3.9.消息转换器

SpringAMQP中,发送消息的方法中接收消息的参数是Object类型

而在数据传输时,它会把你发送的消息序列化为字节再发送给MQ,接收消息的时候,会把字节反序列化为Java对象。

只不过,默认情况下SpringAMQP采用的序列化方式是JDK序列化。

JDK序列化存在下列问题:在做消息转换时,就是采用jdk自带的序列化方式

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

3.9.1.测试默认转换器

1)创建测试队列

首先,我们在consumer服务中声明一个新的配置类:

利用@Bean的方式创建一个队列,

具体代码:

package com.itheima.consumer.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageConfig {@Beanpublic Queue objectQueue() {return new Queue("object.queue");}
}

注意,这里我们先不要给这个队列添加消费者,我们要查看消息体的格式。

重启consumer服务以后,该队列就会被自动创建出来了:

2)发送消息

我们在publisher模块的SpringAmqpTest中新增一个消息发送的代码,发送一个Map对象:

@Test
public void testSendMap() throws InterruptedException {// 准备消息Map<String,Object> msg = new HashMap<>();msg.put("name", "柳岩");msg.put("age", 21);// 发送消息rabbitTemplate.convertAndSend("object.queue", msg);
}

发送消息后查看控制台:

3.9.2.配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化

①在publisherconsumer两个服务中都引入依赖:

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

②配置消息转换器,在publisherconsumer两个服务中都添加这个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

此时,我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码,到MQ的控制台查看消息结构:

3.9.3.消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher服务是用Map集合发送的消息,所以消费者也一定要用Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

4.业务改造

案例需求:改造余额支付功能,将支付成功后使用OpenFeign调用交易服务的更新订单状态接口,改为基于RabbitMQ的异步通知。

如图:

步骤如下:

  • 定义direct类型交换机,命名为pay.direct

  • 定义消息队列,命名为trade.pay.success.queue

  • trade.pay.success.queuepay.direct绑定,BindingKeypay.success

  • 支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到pay.direct,发送消息的RoutingKeypay.success,消息内容是订单id

  • 交易服务监听trade.pay.success.queue队列,接收到消息后更新订单状态为已支付

4.1.配置MQ

不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:

1)添加依赖:

  <!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2)配置MQ地址:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

4.1.接收消息

在trade-service服务中定义一个消息监听的类:

其代码如下:

4.2.发送消息

修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

private final RabbitTemplate rabbitTemplate;@Override
@Transactional
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查询支付单PayOrder po = getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}
}

相关文章:

06-RabbitMQ基础

目录 1.初识MQ 1.1.同步调用 1.2.异步调用 1.3.技术选型 2.RabbitMQ 2.1.安装 2.2.收发消息 2.2.1.交换机 2.2.2.队列 2.2.3.绑定关系 2.2.4.发送消息 2.3.数据隔离 2.3.1.用户管理 2.3.2.virtual host 3.SpringAMQP 3.1.导入Demo工程 3.2.快速入门 3.2.1.消…...

Spring Boot 的自动配置,以rabbitmq为例,请详细说明

Spring Boot 的自动配置特性能够大大简化集成外部服务和组件的配置过程。以 RabbitMQ 为例&#xff0c;Spring Boot 通过 spring-boot-starter-amqp 提供了自动配置支持&#xff0c;开发者只需在应用中添加相关依赖并配置必要的属性&#xff0c;Spring Boot 会自动配置所需的连…...

ros2-4.1 服务通信介绍

服务是ROS图中节点之间的另一种通信方法。服务分为客户端和服务端&#xff0c;客户端发送请求给服务端&#xff0c;服务端可以根据客户端的请求做一些处理&#xff0c;然后返回结果给客户端。也称为为请求-响应模型。 服务和话题的不同之处&#xff0c;话题是没有返回的&#…...

如何 cURL Elasticsearch:进入 Shell

作者&#xff1a;来自 Elastic Philipp Krenn Kibana 的控制台是开始使用 Elasticsearch 的 REST API 的最简单方法 - 语法突出显示、自动完成、格式化、导出 cURL、JavaScript 或 Python。而且你不必担心正确的端点、身份验证等。但是有时&#xff0c;如果 Kibana 不可用、你…...

【信息系统项目管理师】高分论文:论信息系统项目的风险管理(人民医院的信息系统)

更多内容请见: 备考信息系统项目管理师-专栏介绍和目录 文章目录 论文1、规划风险管理2、项目风险识别3、风险定性分析4、风险定量分析5、制定风险应对6、实施风险应对计划7、监督风险论文 2022年6月,我作为项目经理承担了XX县人民医院的信息系统建设,该项目总投资300万,其…...

安装和配置 Apache 及 PHP

安装和配置 Apache 及 PHP # 1. 停止当前 Apache 服务 sudo apachectl stop# 2. 清除现有的 Apache 配置和文件 sudo rm -rf /etc/apache2 sudo rm -rf /usr/sbin/httpd sudo rm -rf /Library/WebServer# 3. 使用 Homebrew 安装 Apache brew install httpd# 4. 启动 Apache su…...

jenkins入门12-- 权限管理

Jenkins的权限管理 由于jenkins默认的权限管理体系不支持用户组或角色的配置&#xff0c;因此需要安装第三发插件来支持角色的配置&#xff0c;我们使用Role-based Authorization Strategy 插件 只有项目读权限 只有某个项目执行权限...

虚功、达朗贝尔原理和拉格朗日方程

本文先引入虚位移,从虚功和虚功原理出发,介绍达朗贝尔原理(d’Alembert’s principle) 和 拉格朗日方程(Lagrange’s equations)。 1. 虚功 力学系统的虚位移(virtual displacement)或称无限小位移(infinitesimal displacement)是指力学系统的位形(configuration …...

面向对象分析和设计OOA/D,UML,GRASP

目录 什么是分析和设计&#xff1f; 什么是面向对象的分析和设计&#xff1f; 迭代开发 UML 用例图 交互图 基于职责驱动设计 GRASP 常见设计原则 什么是分析和设计&#xff1f; 分析&#xff0c;强调是对问题和需求的调查研究&#xff0c;不是解决方案。例如&#x…...

【Linux】记录一下考RHCE的学习过程(七)

年底了&#xff0c;公司接的北京地铁轨道交通的项目做不完了&#xff0c;一百多列地铁的设备都得调&#xff0c;派我出差了几周&#xff0c;这几天才回来&#xff0c;出差累死了实在是没办法更新。&#xff08;YOASOBI的二开票还没抢到ToT&#xff0c;哭死&#xff0c;看看回滚…...

【深度学习】深度(Deep Learning)学习基础

深度学习&#xff08;Deep Learning&#xff09; 深度学习是一种基于人工神经网络的机器学习方法&#xff0c;通过多个层次&#xff08;深度&#xff09;的神经网络从数据中自动学习特征和模式。它是人工智能的一个核心领域&#xff0c;尤其在处理复杂数据&#xff08;如图像、…...

121 买入股票的最佳时机

思路1&#xff1a; 买的那天一定是卖的那天之前的最小值。 每到一天&#xff0c;维护那天之前的最小值即可。 假设第一天是最小值&#xff0c;最大值初始化为0&#xff0c;当以后某天的价格小于最小值时&#xff0c;将最小值更新 当天价格大于最小值&#xff0c;说明有利可图…...

JVM之Java内存模型

Java内存模型&#xff08;Java Memory Model&#xff0c;简称JMM&#xff09;是Java虚拟机&#xff08;JVM&#xff09;规范中定义的一套规则&#xff0c;用于描述多线程环境下变量如何被访问和同步。在多线程编程中&#xff0c;内存模型的重要性不言而喻&#xff0c;它直接关系…...

matlab系列专栏-快捷键速查手册

目录 1在命令窗口(Command Window)中 2. 在编辑器(Editor)&#xff08;m文件&#xff09;中 1在命令窗口(Command Window)中 1)【↑、↓】——切换到之前、之后运行过的命令&#xff0c;可以重复按多次来达到想要的命令。 2)【Tab】——自动补全。在Command窗口&#xff0c…...

快手一面-面经

1. RPC和Http的区别&#xff1f; RPC&#xff08;Remote Procedure Call&#xff0c;远程过程调用&#xff09;和 HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;是两种不同的通信机制&#xff0c;它们有不同的用途、工作原理和应用场景…...

<style lang=“scss“ scoped>: 这是更常见的写法,也是官方文档中推荐的写法

这两种写法在大多数情况下是没有区别的&#xff0c;它们都是 Vue.js 单文件组件 (.vue 文件) 中用来定义组件私有样式的方式。 两种写法&#xff1a; <style lang"scss" scoped>: 这是更常见的写法&#xff0c;也是官方文档中推荐的写法。<style scoped l…...

cerebro关闭ssl

cerebro连接es报错 io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: (certificate_unknown) Received fatal alert: certificate_unknown 在cerebro的application.conf配置文件中添加 play.ws.ssl.loose.acceptAnyCertificate true Disab…...

网络安全常见的问题

1. 什么是 DDoS 攻击&#xff1f;如何防范&#xff1f; 答&#xff1a;DDoS 攻击是指利用大量的计算机或者其他网络设备&#xff0c;同时向目标网络或者服务器 发送 大量的数据流量&#xff0c;以致其无法正常工作&#xff0c;从而导致网络瘫痪或者服务器宕机的攻击行 为。 …...

Eclipse配置Tomcat服务器(最全图文详解)

前言&#xff1a; 本章使用图文讲解如何在Eclipse开发工具中配置Tomcat服务器、如何创建和启动JavaWeb工程&#xff0c;欢迎童鞋们互相交流。觉得不错可以三连订阅喔。 目标&#xff1a; 一、配置Tomcat服务器 1. 切换Eclipse视图 2. 打开菜单 3. 找到服务选项 4. 选择…...

mv指令详解

&#x1f3dd;️专栏&#xff1a;https://blog.csdn.net/2301_81831423/category_12872319.html &#x1f305;主页&#xff1a;猫咪-9527-CSDN博客 “欲穷千里目&#xff0c;更上一层楼。会当凌绝顶&#xff0c;一览众山小。” 目录 基本语法 主要功能 常用选项详解 1. …...

SQL从入门到实战

学前须知 sqlzoo数据介绍 world nobel covid ge game、goal、eteam teacher、dept movie、casting、actor 基础语句 select&from SELECT from WORLD Tutorial - SQLZoo 基础查询select单列&多列&所有列&别名应用 例题一 SELECT name, continent, population …...

回归中医传统 重铸中医之魂 — 薛应中 —

最近做了一个20次课的讲义纲要,每节课都是中医理念下某一类疾病的认知与诊疗,或是一个重大健康观念的辨析,准备陆续和各届人士一起探讨。 下面就算是一个序言,主要是做一个自我介绍,将自己的一点心得,结合我的行医经历,以及学习中医的治学之道等,做一个开场白。 (一)中医的有…...

什么是面向对象?

面向对象编程&#xff08;Object-Oriented Programming&#xff0c;简称OOP&#xff09;是一种流行的编程方法&#xff0c;它以对象和类为基础构建软件。该编程范式围绕“对象”这一基本概念展开&#xff0c;其中对象被视为包含数据和行为的软件构件。以下是对面向对象编程的深…...

HDFS读写流程

因为namenode维护管理了文件系统的元数据信息&#xff0c;这就造成了不管是读还是写数据都是基于NameNode开始的&#xff0c;也就是说NameNode成为了HDFS访问的唯一入口。入口地址是&#xff1a;http://nn_host:8020。 一、写数据流程 1.1 Pipeline管道、ACK应答响应 Pipeline…...

HDFS Federation联邦机制

一、当前HDFS体系架构 1.1 简介 当前的HDFS架构有两个主要的层&#xff1a; 命名空间&#xff08;namespace&#xff09; HDFS体系结构中的命名空间层由文件&#xff0c;块和目录组成。该层支持与名称空间相关的文件系统操作&#xff0c;例如创建&#xff0c;删除&#xff0…...

机器学习周报-ModernTCN文献阅读

文章目录 摘要Abstract 0 提升有效感受野&#xff08;ERF&#xff09;1 相关知识1.1 标准卷积1.2 深度分离卷积&#xff08;Depthwise Convolution&#xff0c;DWConv&#xff09;1.3 逐点卷积&#xff08;Pointwise Convolution&#xff0c;PWConv&#xff09;1.4 组卷积(Grou…...

QT RC_FILE 应用程序图标设置

1.先做一个app.ico 文件&#xff0c;并将文件放入资源文件夹中 2.打开QT项目的.pro文件在最下面增加 RC_FILE $$PWD/res/app.rc 3.在资源文件夹中创建一个app.rc文件。在QT开发工具中编辑并输入下在内容 IDI_ICON1 ICON "app.ico" 4.测试效果...

5G学习笔记之SNPN系列之网络选择

目录 0. NPN系列 1. 概述 2. 自动网络选择 3. 手动网络选择 0. NPN系列 1. NPN概述 2. NPN R18 3. 【SNPN系列】SNPN ID和广播消息 4. 【SNPN系列】UE入网和远程配置 5. 【SNPN系列】SNPN选择 6. PNI-NPN 1. 概述 对于某个特定的UE&#xff0c;可以仅支持SNPN接入模式&#x…...

k8s helm部署kafka集群(KRaft模式)——筑梦之路

添加helm仓库 helm repo add bitnami "https://helm-charts.itboon.top/bitnami" --force-update helm repo add grafana "https://helm-charts.itboon.top/grafana" --force-update helm repo add prometheus-community "https://helm-charts.itboo…...

Redis学习笔记

目录 Nosql概述 为什么用Nosql 什么是Nosql Nosql四大分类 Redis入门 概述 Windows安装 Linux安装 测试性能 基础知识 五大数据类型 Redis-Key String(字符串) List&#xff08;列表&#xff09; Set(集合) Hash&#xff08;哈希&#xff09; Zset&#xff08;有…...

mysql递归查询语法WITH RECURSIVE

WITH RECURSIVE 是 SQL 中用于执行递归查询的语法&#xff0c;特别适合于处理层级结构或递归数据&#xff08;如树形结构、图结构&#xff09;。递归查询可以反复引用自己来查询多层次的数据&#xff0c;而无需写多个嵌套查询。 基本语法结构&#xff1a; WITH RECURSIVE CTE…...

Go语言之十条命令(The Ten Commands of Go Language)

Go语言之十条命令 Go语言简介 Go语言&#xff08;又称Golang&#xff09;‌是由Google开发的一种开源编程语言&#xff0c;首次公开发布于2009年。Go语言旨在提供简洁、高效、可靠的软件开发解决方案&#xff0c;特别强调并发编程和系统编程‌。 Go语言的基本特征 ‌静态强类…...

Visual Studio 2022 C++ gRPC 环境搭建

文章目录 1、gRPC 安装2、创建项目2.1、创建 “空的解决方案”2.2、新建 gRPCServer 和 gRPCClient 项目2.3、创建 proto 文件 2、为 gRPC 服务端和客服端项目配置 protobuf 编译2.1、protobuf 配置2.2、gRPCServer 项目配置2.3、gRPCClient 项目配置 3、测试3.1、启动服务端程…...

2024AAAI SCTNet论文阅读笔记

文章目录 SCTNet: Single-Branch CNN with Transformer Semantic Information for Real-Time Segmentation摘要背景创新点方法Conv-Former Block卷积注意力机制前馈网络FFN 语义信息对齐模块主干特征对齐共享解码头对齐 总体架构backbone解码器头 对齐损失 实验SOTA效果对比Cit…...

【Java从入门到放弃 之 final 关键字】

final 关键字 final 关键字final 字段final 函数列表中的参数final 方法final 类 final 关键字 Java中里面有final这个关键字&#xff0c;这个关键字总体上是用来表达” 不能被改变“ 这个意思的。我们使用这个关键字表达不能被改变&#xff0c;有两种使用场景&#xff0c;有三…...

【U8+】用友U8软件中,出入库流水输出excel的时候提示报表输出引擎错误。

【问题现象】 通过天联高级版客户端登录拥有U8后&#xff0c; 将出入库流水输出excel的时候&#xff0c;提示报表输出引擎错误。 进行报表输出时出现错误&#xff0c;错误信息&#xff1a;找不到“fd6eea8b-fb40-4ce4-8ab4-cddbd9462981.htm”。 如果您正试图从最近使用的文件列…...

文本区域提取和分析——Python版本

目录 1. 图像预处理 2. 文本区域提取 3. 文本行分割 4. 文本区域分析 5. 应用举例 总结 文本区域提取和分析是计算机视觉中的重要任务&#xff0c;尤其在光学字符识别&#xff08;OCR&#xff09;系统、文档分析、自动化数据录入等应用中有广泛的应用。其目标是从图像中提…...

数据库介绍(不同数据库比较)

文章目录 **一、关系型数据库&#xff08;RDBMS&#xff09;****1. MySQL****优点**&#xff1a;**缺点**&#xff1a;**适用场景**&#xff1a; **2. PostgreSQL****优点**&#xff1a;**缺点**&#xff1a;**适用场景**&#xff1a; **3. Oracle Database****优点**&#xff…...

注意力的简单理解,有哪些注意力(Attention)

注意力(Attention) 目录 注意力(Attention)掩码注意力机制自注意力、交叉注意力、掩码注意力的不同点适应场景及举例多头注意分层注意力(Hierarchical Attention)协同注意力(Co - Attention)自注意力(Self - Attention) 简单理解:自注意力就像是一个句子(或序列)内…...

基于Python的投资组合收益率与波动率的数据分析

基于Python的投资组合收益率与波动率的数据分析 摘要&#xff1a;該文通过研究马科维茨的投资组合模型&#xff0c;并将投资组合模型应用到包含6只金融股票的金融行业基金中。首先通过开源的财经接口Tushare获取股票原始数据&#xff0c;接着利用数据分析的黄金组合库&#xf…...

《Opencv》图像的旋转

一、使用numpy库实现 np.rot90(img,-1) 后面的参数为-1时事顺时针旋转&#xff0c;为1时是逆时针旋转。 import cv2 import numpy as np img cv2.imread(./images/kele.png) """方法一""" # 顺时针90度 rot_1 np.rot90(img,-1) # 逆时针90度…...

Python 22:注释

1. 定义&#xff1a; 用熟悉的语言对代码进行解释说明。注释不会被执行。 2. 注释分类 单行注释&#xff1a;只能对一行代码进行注释。放在要注释的代码后面&#xff0c;用#进行分隔&#xff0c;中间至少空2个空格&#xff0c;保证代码规范。 print("hello world10"…...

python:利用神经网络技术确定大量离散点中纵坐标可信度的最高集中区间

当我们有许多离散点并想要确定纵坐标在某个区间内的可信度时&#xff0c;我们可以使用神经网络模型来解决这个问题。下面是一个使用Python编写的示例代码&#xff0c;展示了如何使用神经网络来确定大量离散点中纵坐标可信度的最高集中区间。 import numpy as np from sklearn.…...

计算机软件保护条例

(2001年12月20日中华人民共和国国务院令第339号公布 根据2011年1月8日《国务院关于废止和修改部分行政法规的决定》第一次修订 根据2013年1月30日《国务院关于修改〈计算机软件保护条例〉的决定》第二次修订) 第一章 总则 第一条 为了保护计算机软件著作权人的权益&#…...

CM3/4启动流程

CM3/4启动流程 1. 启动模式2. 启动流程 1. 启动模式 复位方式有三种&#xff1a;上电复位&#xff0c;硬件复位和软件复位。 当产生复位&#xff0c;并且离开复位状态后&#xff0c;CM3/4 内核做的第一件事就是读取下列两个 32 位整数的值&#xff1a; 从地址 0x0000 0000 处取…...

gaussdb中怎么查询一个表有多少GB

在 GaussDB 中&#xff0c;你可以通过多种方法查询一个表的大小&#xff0c;包括使用系统视图和内置函数。以下是几种常见的方法&#xff1a; 1. 使用 pg_total_relation_size 函数 pg_total_relation_size 函数返回一个表及其所有索引和 TOAST 数据的总大小。 示例查询 SE…...

2025-01-06 Unity 使用 Tip2 —— Windows、Android、WebGL 打包记录

文章目录 1 Windows2 Android2.1 横版 / 竖版游戏2.2 API 最低版本2.3 目标帧率2.3.1 targetFrameRate2.3.2 vSyncCount2.3.3 Unity 默认设置以及推荐设置2.3.4 Unity 帧率托管 3 WebGL3.1 平台限制3.2 打包报错记录 13.3 打包报错记录 2 ​ 最近尝试将写的小游戏打包&#xff…...

OP-TEE环境飞腾密码引擎编程指南

【写在前面】 飞腾开发者平台是基于飞腾自身强大的技术基础和开放能力&#xff0c;聚合行业内优秀资源而打造的。该平台覆盖了操作系统、算法、数据库、安全、平台工具、虚拟化、存储、网络、固件等多个前沿技术领域&#xff0c;包含了应用使能套件、软件仓库、软件支持、软件适…...

解密Navicat密码(Java)

最近从Navicat换到了DBeaver&#xff0c;导出配置文件发现配置文件里的密码都是加密的&#xff0c;看网上的都是给的PHP代码&#xff0c;因为环境问题&#xff0c;就算是在线上运行的PHP代码也会报错&#xff0c;所以就把这段代码改成Java了。 package com.unicdata.system.con…...

apex安装

安装过程复杂曲折&#xff0c;网上说的很多办法&#xff0c;貌似成功了&#xff0c;实际还是没起作用。 先说成功过程&#xff0c;执行下面命令&#xff0c;安装成功&#xff08;当然&#xff0c;前提是你要先配置好编译环境&#xff09;&#xff1a; &#xff08;我的环境&a…...