RabbitMQ的其中工作模式介绍以及Java的实现
文章目录
- 前文
- 一、模式介绍
- 1. 简单模式
- 2. 工作队列模式
- 3. 广播模式
- 4. 路由模式
- 5. 通配符模式
- 6. RPC模式
- 7. 发布确认模式
- 二、代码实现
- 1、简单模式
- 2、工作队列模式
- 生产者
- 消费者
- 消费者 1
- 消费者 2
- 3、广播模式 (Fanout Mode)
- 生产者
- 消费者
- 4、路由模式 (Direct Mode)
- 生产者
- 消费者
- 5、通配符模式
- 6、RPC模式 (Remote Procedure Call Mode)
- 服务器 (Server)
- 客户端 (Client)
- 7、发布确认模式 (Publisher Confirms)
- 1. 单独确认 (Publishing Messages Individually)
- 2. 批量确认 (Publishing Messages in Batches)
- 3. 异步确认
- 对比总结
前文
为了更好的理解RabbitMQ中的工作模式,最好先了解RabbitMQ的几种常见交换机的类型
-
Fanout(扇出交换机)
它会忽视路由键,把消息发送给所有绑定了该交换机的所有队列 -
Direct(直接交换机)
根据生产者发送消息时设置的routingKey和交换机与不同队列绑定的bindingKey进行匹配,如果匹配把消息发送给对应的队列 -
Topic(通配符交换机)
可以认为是Direct的升级版。Direct中bindingKey必须是一个常量字符串,在Topic中bindingKey可以是一个通配符,类似于正则表达式。只要routingKey符合bindingKey的字符串模式,那么就可以把消息发送给指定队列
RabbitMQ中用
.
来分割每一个单词。*
表示匹配一个任意单词,可以是单个字母。#
表示可以匹配0个或者多个单词,比*宽松。
例如,# 可以匹配 a、a.b、a.b.c 等,而 . 只能匹配正好两个单词的路由键(如 a.b)
- Header
这种交换机不依赖于routingKey和bindingKey。它会根据消息中的headers属性进行匹配。但是由于其性能低下,因此很少用。
此外,代码实现部分博客使用的RabbitMQ自带的依赖包。Spring也支持RabbitMQ。
两者在RabbitMQ官网都有说明。
一、模式介绍
1. 简单模式
七个模式中最简单的模式,特点是一个生产者p、一个消费者c,消息只能被消费一次。适用于消息只能被单个消费者消费的场景。
2. 工作队列模式
概述: ⼀个⽣产者P,多个消费者C1,C2. 在多个消息的情况下, Work Queue 会将消息分派给不同的消费者, 每个消费者都会接收到不同的消息
特点: 消息不会重复, 分配给不同的消费者
适⽤场景: 集群环境中做异步处理。例如12306候补成功的短信服务,其中每个短信服务功能是一样的,消息给到那个消费者都可以,类似于集群:
3. 广播模式
概述: 图中x是exchange,exchange会根据消息中的routingkey与Q1、Q2绑定的bindkey进行匹配,如果匹配成功,把消息转发给指定的队列。
特点: 一个生产者发送给exchange的消息,会被exchange复制多分,分别发送给绑定了这个exchange的queue。每个消费者获得的消息都是一样的。
应用场景: 比如1001就老喜欢这种东西了,想给自己的客户推销广告,用广播模式,就可以把消息发送给所有的用户。
4. 路由模式
概述: 这个模式相当于是广播模式的一个约束,它会根据消息中的routingKey和与其他队列绑定的bindingKey进行匹配,如果匹配才会把消息发送给指定队列。
💡routingKey 和bindgKey必须完全一直才能匹配成功
5. 通配符模式
概述: 相当于路由模式的升级版,只要消息中的routingKey与指定队列的通配符匹配进行发送消息。
根据上图示例:
- ff.a.j与*.a.*匹配,该消息就会发送到Q1
- 消息:c.jojo.hyy 与c.#匹配,该消息就会发送到Q2
6. RPC模式
概述: RCP模式下 没有Producter和Consumer的概念,取而代之的是Client和Server的结构。Client发送消息给Server并且希望Server能发送一个期望的响应给Client,可以使用RPC模式.
特点: Client发送消息会设定两个字段relyTo、correlationId。replyTo用于指定Server使用哪一个回调队列(图中使用的Reply)发送响应给到Client。Client会等待回调队列发送reply给到自己,根据correlationId确保是Cilen需要的响应。
7. 发布确认模式
概述: 发布确认机制是RabbitMQ用于保证消息可靠性的其中一个方式。
- producter把对应的channel设置成confirm模式(通过channel.confirmSelect方法实现),并且设定一个消息唯一ID,把消息与唯一ID关联起来
- exchange接收到消息后会发送一个ACK响应给到producter(响应中含有唯一ID),表明消息已经送达。
这种方法可以尽可能的避免在消息发送过程中的丢失问题。
二、代码实现
代码实现,主要有两种,一个是RabbitMQ官方提供的依赖包,另一个是Spring官方AMQP对RabbitMQ的封装实现,两者都会演示
RabbitMQ中央仓库
找到合适的版本导入即可,本博客使用的5.20.0版本。
1、简单模式
生产者:
public class Producer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);//4、发布消息channel.basicPublish("", Constants.SIMPLE_QUEUE, null, "呵呵".getBytes());System.out.println("执行了发布");//5、关闭连接channel.close();connection.close();}
}
💡
- Channel、Connection的相关包都来自于com.rabbitmq.client不要导错了
- 步骤四中参数 “” 的意思是使用默认交换机(Direct类型),bindingKey就是已经绑定的队列名字。
消费者:
public class Consumer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);//5、定义consumer逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//根据实际场景消费消息System.out.println("消息已经被消费,获取的消息内容:" + new String(body));}};//6、消费内容channel.basicConsume(Constants.SIMPLE_QUEUE, true, consumer);//7、关闭连接channel.close();connection.close();}
}
💡
consumerTag: 标识不同消费者的唯一标签
envelope: 描述了消息传递的细节,如该消息是由那个交换机发送的,消息指定的routingKey是什么,消息的唯一标识deliveryTag。
properties: 用于设定RabbitMQ的高级属性
body: 消息的本体,以二进制方式存储
2、工作队列模式
工作队列模式(Work Queue Mode)是一种任务分发的模式,允许多个消费者从同一个队列中获取消息并处理,从而实现任务的负载均衡。消息会被轮询(Round-Robin)分发到不同的消费者,适合处理耗时任务的场景。
生产者
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Product {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、发送20条消息for (int i = 0; i < 20; i++) {channel.basicPublish("", Constants.WORK_QUEUE, null, ("工作队列的消息" + i).getBytes());}// 5、关闭连接channel.close();connection.close();}
}
说明:
- 队列声明:使用
channel.queueDeclare
声明一个持久化的队列(durable=true
),确保队列在 RabbitMQ 重启后依然存在。 - 消息发送:通过
basicPublish
方法向默认交换机(""
)发送消息,路由键为队列名称(Constants.WORK_QUEUE
)。 - 消息内容:循环发送 20 条消息,每条消息为
"工作队列的消息" + i
。 - 连接关闭:发送完成后关闭信道和连接。
💡 注意:
- 参数
""
表示使用默认交换机(Direct 类型),路由键直接绑定到队列名称。
消费者
消费者从工作队列中获取消息并处理。以下是两个消费者的实现,分别命名为 Consumer1
和 Consumer2
,它们共享同一队列的消息,每个消费者拿到不同的消息。
消费者 1
public class Consumer1 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("第一个消费者消费消息:" + new String(body));}};// 5、消费channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// 6、保持连接(注释掉关闭连接的代码)// channel.close();// connection.close();}
}
消费者 2
public class Consumer2 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("第二个消费者消费消息:" + new String(body));}};// 5、消费channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// 6、保持连接(注释掉关闭连接的代码)// channel.close();// connection.close();}
}
说明:
- 队列声明:与生产者一致,消费者也声明相同的队列,确保队列存在。
- 消费逻辑:通过继承
DefaultConsumer
并重写handleDelivery
方法,定义消息处理逻辑。Consumer1
和Consumer2
分别打印接收到的消息,标识为“第一个消费者”和“第二个消费者”。 - 消息消费:使用
channel.basicConsume
订阅队列,autoAck=true
表示自动确认消息(消费者接收消息后自动通知 RabbitMQ,把消息从队列中删除)。
💡 注意:
- consumerTag:标识消费者的唯一标签,用于区分不同的消费者。
- envelope:包含消息的元数据,如路由键、交换机和
deliveryTag
(消息的唯一标识)。- properties:消息的附加属性,可用于高级配置。
- body:消息的实际内容,以字节数组形式存储。
3、广播模式 (Fanout Mode)
广播模式通过 Fanout
交换机将消息分发到所有绑定的队列,忽略路由键,适合发布/订阅场景。以下基于提供的代码续写。
生产者
关键点:
- 声明
Fanout
交换机 (exchangeDeclare
)。 - 声明并绑定多个队列到交换机 (
queueDeclare
,queueBind
)。 - 发布消息到交换机,路由键为空 (
basicPublish
)。
消费者
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}
关键点:
- 声明队列并监听消息 (
queueDeclare
,basicConsume
)。 - 每个消费者独立消费绑定队列的消息。
- 自动确认消息 (
autoAck=true
)。
💡 注意:
Fanout
模式下,路由键被忽略,消息广播到所有绑定队列。- 确保交换机和队列正确绑定,避免消息丢失。
4、路由模式 (Direct Mode)
路由模式通过 Direct
交换机根据路由键精确分发消息到匹配的队列,适合需要条件路由的场景。
生产者
public class Producer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5、交换机绑定队列q1 q2channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"q1");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"q2");//6、生产者发送消息channel.basicPublish(Constants.DIRECT_EXCHANGE,"q1",null,("q1需要接收到这个消息").getBytes());channel.basicPublish(Constants.DIRECT_EXCHANGE,"q2",null,("q2需要接收到这个消息").getBytes());//7、关闭资源channel.close();connection.close();}
}
关键点:
- 声明
Direct
交换机 (exchangeDeclare
)。 - 声明队列并绑定到交换机,指定路由键 (
queueBind
)。 - 发布消息时指定路由键 (
basicPublish
)。
消费者
public class Consumer1 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//5、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+new String(body));}};//5、消费channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);//6、关闭连接channel.close();connection.close();}
}
public class Consumer2 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+new String(body));}};//5、消费channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);//6、关闭连接channel.close();connection.close();}
}
关键点:
- 声明队列并监听消息 (
queueDeclare
,basicConsume
)。 - 根据队列绑定的路由键接收对应消息。
- 自动确认消息 (
autoAck=true
)。
💡 注意:
- 路由键必须精确匹配,消息才会分发到对应队列。
- 队列可以绑定多个路由键,增加灵活性。
- 未绑定路由键的队列不会收到消息。
5、通配符模式
通配符模式和路由模式实现的不同点就是交换机使用TOPIC类型,交换机和队列绑定使用通配符,其他代码几乎一致,这里就不演示了。
6、RPC模式 (Remote Procedure Call Mode)
RPC模式通过RabbitMQ实现客户端与服务器的双向通信,客户端发送请求到服务器并等待响应,适合需要同步响应的场景。
服务器 (Server)
public class Server {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列并且设定对多处理消息数channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.basicQos(1);//5、定义consumer逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//根据实际场景消费消息System.out.println("客服端发送了这个消息:" + new String(body));//获取消息中的correID将其发送会客户端AMQP.BasicProperties proper = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();//给客户端发送响应,指定使用replayTochannel.basicPublish("", properties.getReplyTo(), proper, ("收到来自客户端的请求").getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}};//6、消费内容channel.basicConsume(Constants.RPC_REQUEST_QUEUE, true, consumer);}
}
关键点:
- 声明请求和响应队列 (
queueDeclare
)。 - 设置消息处理限制 (
basicQos
),确保按序处理。 - 消费请求队列消息,发送响应到客户端指定的
replyTo
队列。 - 使用
correlationId
关联请求和响应。
客户端 (Client)
public class Client {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4、生成唯一ID用于区分当前消息String corrID= UUID.randomUUID().toString();//5、配置请求相关属性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();//4、发布消息channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, "呵呵".getBytes());System.out.println("执行了发布");//5、等待响应final BlockingQueue<String> bq= new LinkedBlockingQueue<>(1);DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String response=new String(body);if(properties.getCorrelationId().equals(corrID)){System.out.println("接收到回调消息:"+response);bq.offer(response);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);bq.take();}
}
关键点:
- 声明请求和响应队列 (
queueDeclare
)。 - 生成唯一
correlationId
标识请求。 - 发送请求到
RPC_REQUEST_QUEUE
,指定replyTo
为响应队列。 - 监听
RPC_RESPONSE_QUEUE
,验证correlationId
匹配后处理响应。
7、发布确认模式 (Publisher Confirms)
发布确认模式确保生产者发送的消息被RabbitMQ正确接收,提供可靠性保证。确认方式有以下三种:
1. 单独确认 (Publishing Messages Individually)
private static void individually() throws Exception {try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4. 发送消息, 并等待确认long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}
关键点:
- 启用确认模式 (
confirmSelect
)。 - 每发送一条消息,同步等待确认 (
waitForConfirmsOrDie
)。 - 适合小规模消息发送,因为性能较低。
2. 批量确认 (Publishing Messages in Batches)
private static void publishingMessagesInBatches() throws Exception{try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}
关键点:
- 启用确认模式 (
confirmSelect
)。 - 每发送一批消息 (如100条),同步等待确认 (
waitForConfirmsOrDie
)。 - 平衡了性能与可靠性。
- 但在一些消息容易遗失的场景,我们不清楚具体是那个消息出现问题,需要批量重发消息,性能可能不增返降。
3. 异步确认
private static void asynchronously() throws Exception{try (Connection connection = createConnection()){//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4. 监听confirm//集合中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//5. 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}
关键点:
- 启用确认模式 (
confirmSelect
)。 - 使用
channel.ConfirmListener()
开启监听,异步处理确认 (handleAck
,handleNack
)。 - 通过
SortedSet
跟踪未确认消息。 - 最高吞吐量,适合大规模消息发送。
对比总结
策略 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
单独确认 | 简单,高可靠性 | 延迟高,吞吐量低 | 小规模、可靠性优先 |
批量确认 | 平衡性能与可靠性 | 仍需同步等待,部分延迟 | 中等规模、可靠性与性能兼顾 |
异步确认 | 高吞吐量,低延迟 | 实现复杂,需处理失败重发 | 大规模、高性能需求 |
相关文章:
RabbitMQ的其中工作模式介绍以及Java的实现
文章目录 前文一、模式介绍1. 简单模式2. 工作队列模式3. 广播模式4. 路由模式5. 通配符模式6. RPC模式7. 发布确认模式 二、代码实现1、简单模式2、工作队列模式生产者消费者消费者 1消费者 2 3、广播模式 (Fanout Mode)生产者消费者 4、路由模式 (Direct Mode)生产者消费者 5…...
Docker 镜像打包到本地
保存镜像 使用 docker save 命令将镜像保存为一个 tar 文件。命令格式如下: docker save [options] IMAGE [IMAGE...]示例:docker save -o centos.tar centos:latest--output 或 -o:将输出保存到指定的文件中。 加载镜像 如果需要在其他机器…...
5分钟搭建智能看板:衡石科技自助式BI工具使用教程
在数据驱动的时代,业务人员需要快速将数据转化为洞察,而非依赖IT团队排队开发报表。衡石科技HENGSHI SENSE的自助式BI工具,通过零代码配置、模板化设计、智能分析三大核心能力,让任何人都能在5分钟内搭建专业级数据看板。本文将手…...
安卓开发用到的设计模式(1)创建型模式
安卓开发用到的设计模式(1)创建型模式 文章目录 安卓开发用到的设计模式(1)创建型模式1. 单例模式(Singleton Pattern)2. 工厂模式(Factory Pattern)3. 抽象工厂模式(Abs…...
Unity3D序列化机制详解
前言 Unity3D的序列化机制是其编辑器与运行时数据管理的核心,理解其工作原理对高效开发至关重要。以下是关键点总结: 对惹,这里有一个游戏开发交流小组,希望大家可以点击进来一起交流一下开发经验呀! 1. 序列化的作…...
[Harmony]自定义导航栏
1.方案一 CustomNavigationBar import { router } from kit.ArkUI; import { DevicesUtil } from ../utils/DevicesUtil; import { common } from kit.AbilityKit;Component export struct CustomNavigationBar {State private navHeight: number 44State parTitle: string …...
LeetCode117_填充每个结点的下一个右侧结点指针Ⅱ
LeetCode117_填充每个结点的下一个右侧结点指针Ⅱ 标签:#树 #深度优先遍历 #广度优先遍历 #链表 #二叉树Ⅰ. 题目Ⅱ. 示例 0. 个人方法 标签:#树 #深度优先遍历 #广度优先遍历 #链表 #二叉树 Ⅰ. 题目 给定一个二叉树: struct Node {int v…...
Qt enabled + geometry 属性(2)
文章目录 enabled属性可用与禁用的概念API接口代码演示 阐述说明1. 先简单描述下要如何演示出上面两个接口的效果(思路)2. 事先规范按钮对象的命名3. 定义两个按钮对象的槽函数 动图演示效果4. widget.cpp geometry属性预备知识API接口上下左右移动 ta…...
OpenHarmony外设驱动使用 (十),Sensor
OpenHarmony外设驱动使用 (十) Sensor 概述 功能简介 Sensor驱动模型屏蔽硬件器件差异,为上层Sensor服务系统提供稳定的Sensor基础能力接口,包括Sensor列表查询、Sensor启停、Sensor订阅及取消订阅,Sensor参数配置等…...
(2025小白全踩坑版)【OpenHarmony】移植 3.1 版本系统到 STM32F407ZG开发板
在上stm32课程,有这样一道要求: 参考了大佬的文章之后,发现出现了liteos_m.mk文件找不到的情况,于是只能另寻他路 VSCode 搭建 STM32 开发环境_vscode stm32仿真-CSDN博客 【OpenHarmony】移植 3.1 版本系统到 STM32_openharm…...
【HTML-4】HTML段落标签:构建内容结构的基础
在网页开发中,段落标签<p>是最基础也是最重要的HTML元素之一。这篇博客将深入探讨段落标签的用法、最佳实践以及相关技术细节。 1. 段落标签的基本用法 HTML段落标签用于定义文本段落,浏览器会自动在段落前后添加一定的空白(margin&a…...
深度学习+Flask 打包一个AI模型接口并部署上线
🚀 深度学习 + Flask 打包一个 AI 模型接口并部署上线(实战教程) 深度学习模型训练完毕后,我们该如何部署上线让它“动起来”?本篇带你手把手用 Flask 将训练好的 PyTorch 模型封装成 Web 接口,实现一个轻量、可访问的在线 AI 服务。 🧠 一、为什么要部署模型? 训练…...
C++类与对象(二):六个默认构造函数(二)
在上篇提到了构造函数、拷贝构造函数、析构函数,这篇将会分享剩下默认构造函数:赋值运算符重载、运算符重载。当学习了这些构造函数可以实现一个日期类。 目录 运算符重载 赋值运算符重载 前置 后置 运算符重载 函数名字为:关键字operat…...
HarmonyOS NEXT应用开发实战:玩鸿蒙App客户端开发
之前学习android时候,有一个玩android的客户端项目很火,不但能够学习知识,还能够动手实践,激发学习兴趣。这里作者通过一个完整的实战项目—玩鸿蒙客户端App,一块儿深入学习如何在HarmonyOS平台上开发一个功能丰富且完…...
十六、面向对象底层逻辑-BeanPostProcessor接口设计
一、引言:Bean生命周期的精密控制 在Spring容器的Bean实例化过程中,BeanPostProcessor接口是开发者介入对象初始化阶段的核心扩展点。作为Spring框架最强大的扩展机制之一,该接口提供了对Bean实例化过程的原子级控制能力,支撑了A…...
在线免费图片处理工具-传道软件图片工具
在线免费图片处理工具-传道软件图片工具 在线免费图片处理工具,无需注册与登录,用完即走。 官网链接: https://www.chdaoai.com/image.html 功能有: Favicon图标生成,图片颜色拾取器,屏幕颜色拾取&…...
JS进阶学习04
一、深浅拷贝 1.浅拷贝 首先浅拷贝和深拷贝只针对引用类型 浅拷贝:拷贝的是地址 常见方法: 1. 拷贝对象:Object.assgin() / 展开运算符 {...obj} 拷贝对象 2. 拷贝数组:Array.prototype.concat() 或者 [...arr] >如果是简…...
CSS、SCSS 和 SASS 的语法差异
CSS、SCSS 和 SASS 的语法差异 CSS (Cascading Style Sheets) 标准样式表语言,所有浏览器原生支持语法特点: 使用大括号 {} 包裹规则使用分号 ; 结束声明简单的选择器-属性-值结构 .container {width: 100%;margin: 0 auto; }SCSS (Sassy CSS) CSS的…...
ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的使用场景
在Spring Boot项目中,ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的使用场景不同,但大部分开发者会更倾向于用 ThreadPoolTaskExecutor。我来给你拆解清楚,面试时直接甩这个答案! 1️⃣ 核心区别 ThreadPoolExecutor…...
打卡31天
文件的规范拆分和写法 知识点回顾 规范的文件命名 规范的文件夹管理 机器学习项目的拆分 编码格式和类型注解 作业:尝试针对之前的心脏病项目,准备拆分的项目文件,思考下哪些部分可以未来复用。 补充介绍: pyc文件的介绍 知识…...
OBOO鸥柏丨AI数字人触摸屏查询触控人脸识别语音交互一体机上市
OBOO鸥柏丨AI数字人触摸屏查询触控人脸识别语音交互一体机上市分析 OBOO鸥柏品牌推出的AI数字人触摸屏查询触控人脸识别语音交互一体机,是其在智能交互设备领域的又一创新产品。该一体机整合了触摸屏查询、AI人脸识别、AI声源定位语音麦克风,触控交互以…...
基于大模型的闭合性尺桡骨干骨折全方位诊疗研究报告
目录 一、引言 1.1 研究背景与目的 1.2 研究意义 二、大模型技术原理与应用现状 2.1 大模型基本原理 2.2 在医疗领域的应用案例 三、闭合性尺桡骨干骨折概述 3.1 骨折定义与分类 3.2 流行病学特征 3.3 临床症状与诊断方法 四、大模型在术前风险预测中的应用 4.1 数…...
Win11上安装docker
Win11上安装docker 一、安装WSL(Windows Subsystem for Linux)二、安装docker到D盘三、启动docker四、测试启动容器 一、安装WSL(Windows Subsystem for Linux) 以管理员身份打开cmd 更新WSL wsl --update3. 安装WSL wsl --ins…...
Axure项目实战:智慧运输平台后台管理端-订单管理1(多级交互)
亲爱的小伙伴,在您浏览之前,烦请关注一下,在此深表感谢!如有帮助请订阅专栏! Axure产品经理精品视频课已登录CSDN可点击学习https://edu.csdn.net/course/detail/40420 课程主题:订单管理 主要内容:条件组合、中继器筛选、表单跟随菜单拖动、审批数据互通等 应用场景…...
如何在 Android 手机和平板电脑上下载应用程序
对于Android用户来说,从Google Play Store下载应用程序并不陌生,对吧?但是,除了 Google Play 商店之外,您还可以在哪里为 Android 设备下载和安装应用程序呢?这就是我们今天要分享的内容。我们解释了 6 种下…...
C++23 新特性:允许 std::stack 与 std::queue 从迭代器对构造 (P1425R4)
文章目录 背景与动机提案内容与实现细节提案 P1425R4实现细节编译器支持 对开发者的影响提高灵活性简化代码向后兼容性 总结 C23标准带来了许多令人兴奋的新特性和改进,其中之一便是对标准容器的增强。提案P1425R4允许 std::stack 和 std::queue 直接从一对迭代器…...
在线OJ系统测试报告
在线OJ系统测试报告 项目背景项目功能管理员功能用户功能 测试计划功能测试自动化测试性能测试 项目背景 本项目为在线OJ系统,采用微服务架构以及前后端分离的方法来实现,包含用户管理、题目管理、竞赛管理、判题服务、网关服务、消息与任务调度等多个子…...
31-35【动手学深度学习】深度学习硬件
1. CPU和GPU 1.1 CPU CPU每秒钟计算的浮点运算数为0.15,GPU为12。GPU的显存很低,16GB(可能32G封顶),CPU可以一直插内存。 左边是GPU(只能做些很简单的游戏,视频处理),中…...
Dify的大语言模型(LLM) AI 应用开发平台-本地部署
前言 今天闲着,捣鼓一下 Dify 这个开源平台,在 mac 系统上,本地部署并运行 Dify 平台,下面记录个人在本地部署Dify 的过程。 Dify是什么? Dify是一个开源的大语言模型(LLM)应用开发平台&#…...
《MQTT 从 0 到 1:原理、实战与面试指南全解》
一、MQTT 是什么? MQTT(Message Queuing Telemetry Transport)是一种 轻量级、基于发布/订阅(Pub/Sub)模式的消息传输协议,适用于物联网(IoT)、实时通信等对 低带宽、高延迟、不稳定…...
SpringMVC 通过ajax 实现文件的上传
使用form表单在springmvc 项目中上传文件,文件上传成功之后往往会跳转到其他的页面。但是有的时候,文件上传成功的同时,并不需要进行页面的跳转,可以通过ajax来实现文件的上传 下面我们来看看如何来实现: 方式1&…...
图片识别(TransFormerCNNMLP)
目录 一、Transformer (一)ViT:Transformer 引入计算机视觉的里程碑 (二)Swin-Transformer:借鉴卷积改进 ViT (三)VAN:使用卷积模仿 ViT (四)…...
手术机器人行业新趋势:Kinova多机械臂协同系统如何突破复杂场景适应性瓶颈?
机器人手术历经多阶段技术演进,已成为现代医疗重要方向。其需求增长源于医疗机构对高精度低风险手术方案的需求、微创手术普及及技术进步带来的复杂场景适应性提升。Kinova 轻型机械臂凭借模块化设计与即插即用功能,可快速适配不同手术环境,为…...
国酒华夏实业酒水供应链:全品类覆盖打造一站式购销平台
在消费升级与供应链效率双重驱动的酒水行业变革中,国酒华夏实业凭借全品类覆盖与数字化赋能,构建起集采购、品鉴、文化传播于一体的新型酒水供应链体系。其“一站式购销平台”模式不仅重塑了传统酒水流通链路,更通过精准服务与品质保障&#…...
【Qt】:设置hover属性,没有适应到子控件中
#ButtonStyle:hover 是一个 ID 选择器,仅对设置了 objectName"ButtonStyle" 的控件本身生效,不会自动应用到其子控件(如 QLabel 和 QWidget)。 在ButtonForm中,有一个Qwidget控件,在这个Qwidget中…...
缺乏经验的 PCB 过孔建模方法
您是一名背板设计人员,被指派设计一种新的高速、多千兆位串行链路架构,从多个线卡到背板上的多个交换矩阵交换卡。这些链路必须在第一天以 6GB/s 的速度运行,并且为 10GB/s (IEEE 802.3KR) 做好产品演进的准备。时间表很紧,您需要提出一个背板架构,以允许程序的其余部分…...
搭建人工智能RAG知识库的主流平台与特点概述
在2022年末chatgpt和2024年末deepseek的推动下,人工智能应用如雨后春笋,层出不穷,日新月异。现推荐一些截至目前比较主流的用来搭建RAG的平台。 1. Haystack 特点: 模块化架构:支持端到端问答系统构建,集…...
【QT】在界面A打开界面B时,界面A隐藏,界面B关闭时,界面A复现
在Qt6中,可以通过信号与槽机制实现界面A在关闭界面B时重新显示。以下是具体的实现步骤: 方法一:使用自定义关闭信号 在界面B中定义关闭信号:当界面B关闭时发射该信号。连接信号到界面A的显示槽:在界面A中创建界面B时…...
捡漏岗位:国考报名数据和岗位特征分析
2025 年国考官方数据及权威分析,报录比低于 10:1 的岗位主要集中在中西部艰苦边远地区、特殊专业技术岗位及定向招录岗位。 岗位名称招录机关地区招录人数报名人数报录比报考条件示例一级警长及以下(三)新疆出入境边防检查总站新疆3124:1男性…...
qt---命名规范
1、命名规范 1) 类名:单词首字母大写,单词和单词之间直接连接,无需连接字符 如:MyClass,QPushButton class MainWindow { };2) Qt中内置的类型,头文件和类命名同名。 如: #include <QStri…...
信息系统项目管理师考前练习3
项目组合管理 企业战略调整后,项目组合经理应优先: A. 终止所有不符合新战略的项目 B. 重新评估项目优先级与资源分配 C. 要求所有项目加快交付进度 D. 合并相似项目以减少成本 答案:B 解析:项目组合管理的核心是动态对齐战略,优先重新评估项目价值与资源匹配(第5版强调…...
【算法创新+设计】灰狼算法GWO+扰动算子,求解大规模TSP问题利器
目录 1.灰狼算法GWO原理2.连续空间到离散空间3.核心公式处理4.结果展示5.代码获取6.读者交流 1.灰狼算法GWO原理 【智能算法】灰狼算法(GWO)原理及实现 2.连续空间到离散空间 GWO算法是针对连续空间问题设计的优化方法,而旅行商问题&#…...
GPU P-State 模式说明
在 NVIDIA GPU 上,“P-State”(Performance State)用来表示显卡当前的性能/功耗等级,P0 代表最高性能(最高核心频率、最大功耗),数字越大性能越低、功耗越小。不同 P-State 的主要区…...
真实世界中的贝叶斯网络:Bootstrap、模型平均与非齐次动态的科研应用
在生态与环境科学领域,揭示变量间因果机制是理解复杂系统运行规律的核心挑战。传统实验方法受限于高昂成本与生态扰动风险,而经典统计模型仅能刻画变量相关性,难以突破"相关非因果"的认知瓶颈。贝叶斯网络作为融合图论与概率论的前…...
.NET外挂系列:4. harmony 中补丁参数的有趣玩法(上)
一:背景 1. 讲故事 前面几篇我们说完了 harmony 的几个注入点,这篇我们聚焦注入点可接收的几类参数的解读,非常有意思,在.NET高级调试 视角下也是非常重要的,到底是哪些参数,用一张表格整理如下ÿ…...
【VLNs篇】03:VLMnav-端到端导航与视觉语言模型:将空间推理转化为问答
栏目内容论文标题End-to-End Navigation with Vision-Language Models: Transforming Spatial Reasoning into Question-Answering (端到端导航与视觉语言模型:将空间推理转化为问答)核心问题如何利用大型视觉语言模型(VLM)实现端到端的机器人…...
云原生攻防4(Kubernetes基础补充)
什么是K8S? Kubernetes 是做什么的? 什么是 Docker? 什么是容器编排? Kubernetes 一词来自希腊语,意思是“飞行员”或“舵手”。这个名字很贴切,Kubernetes 可以帮助你在波涛汹涌的容器海洋中航行。 Kubernetes 是 Google 基于 Borg 开源的容器编排调度引擎,作为 CNCF最…...
redis--redisJava客户端:Jedis详解
在Redis官网中提供了各种语言的客户端,地址: https://redis.io/docs/latest/develop/clients/ Jedis 以Redis命令做方法名称,学习成本低,简单实用,但是对于Jedis实例是线程不安全的(即创建一个Jedis实例&a…...
SpringBoot-SpringBoot源码解读
SpringBoot-SpringBoot源码解读 一、Spring Boot启动过程概述 Spring Boot通过一系列的类和机制,简化了Spring应用的启动流程。当你执行SpringApplication.run()时,Spring Boot会自动完成应用的初始化、环境配置、组件加载、自动配置等任务,…...
黑马程序员C++2024新版笔记 第4章 函数和结构体
1.结构体的基本应用 结构体struct是一种用户自定义的复合数据类型,可以包含不同类型的成员。例如: struct Studet {string name;int age;string gender; } 结构体的声明定义和使用的基本语法: struct 结构体类型 {成员1类型 成员1名称;成…...