掌握RabbitMQ:全面知识点汇总与实践指南
前言
RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。
特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。
作用:服务间异步通信;顺序消费;定时任务;请求削峰;
1、AMQP协议定义
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个高效的、跨平台的应用层协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输
特性 | AMQP | MQTT |
---|---|---|
适用场景 | 大型企业级应用、金融交易、云服务 | 物联网、移动应用、智能家居 |
通信模式 | 生产者-消费者 | 发布-订阅 |
消息大小 | 较大,适合复杂的消息结构 | 小型,适合简单的消息 |
QoS 级别 | 支持,但不如 MQTT 精细 | 详细的 QoS 级别,特别是针对 IoT 场景 |
性能要求 | 对性能有一定要求,但更注重可靠性和安全性 | 极低的带宽消耗和资源占用 |
安全性 | 强调端到端的安全性 | 支持基本的安全特性,适用于资源受限环境 |
2、AMQP机制
1>AMQP生产者、消费者工作机制
AMQP高级消息队列协议,基于生产者消费者模式,消息基于交换器Exchange、队列Queue、绑定Binding进行路由。
- 生产者发送消息到Broker消息代理服务
- 交换器接收生产者发送的消息,根据预定义规则,分发给一个或多个队列
- 队列存储消息,直到消费者取走消息
- 消费者,读取队列中的消息
AMQP定义了严格的消息结构,使用了类型化数据表示法描述消息内容来兼容不同的系统。
类型化数据表示法(Typed Representation of Data)是指在计算机编程语言中,数据和其相关联的类型信息一起被表示的方法。
2>AMQP消息传递方式
特性 | 点对点模式 (P2P) | 发布/订阅模式 (Pub/Sub) |
---|---|---|
消息传递方式 | 每条消息仅被一个消费者处理 | 一条消息可以被多个消费者同时接收 |
队列数量 | 单个队列 | 每个消费者有自己的队列 |
生产者行为 | 直接发送到队列 | 发送到交换器,由交换器负责路由 |
消费者行为 | 从同一队列中竞争消费 | 各自独立消费自己的队列中的消息 |
适用场景 | 任务分配、工作流管理 | 广播通知、日志记录、事件驱动架构 |
扩展性 | 受限于单个队列的吞吐量 | 可以通过增加更多的消费者来提高整体吞吐量 |
复杂度 | 较低,易于理解和实现 | 需要考虑交换器类型、路由规则等因素,稍微复杂 |
- 1、点对点
生产者将消息发送到一个特定的队列中,而消费者则从该队列中获取消息
每个消息只会被一个消费者处理,即使有多个消费者监听同一个队列。
竞争消费:多个实例尝试处理同一个消息时,可能出现重复消费或消息未及时得到处理的情况。
(1)竞争消费问题
在k8s部署多实例场景下,虽然提升了系统的吞吐量,通过调度器实现了负载均衡,多个实例从一个队列中读取消息,但是并发场景客观存在竞争消费的情况,导致重复消费消息。
(2)解决建议
合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。
// 生产者代码片段
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Task to be processed";
channel.basicPublish("", "task_queue", null, message.getBytes());// 消费者代码片段
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 执行任务...channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> {});
- 2、发布订阅
生产者将消息发送到一个交换器(Exchange),而不是直接发送到队列。交换器根据预定义的路由规则(Binding Key)将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅,所有订阅者都能收到相同的消息副本
(1)主题分区
为不同类型的时间,创建不同的主题或分区,来减少不必要的复制。实例只订阅感兴趣的主题,降低资源开销
(2)限流
避免过载,限制单位之间内消费的最大消息
// 生产者代码片段
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_exchange", "fanout");
String message = "Info log message";
channel.basicPublish("logs_exchange", "", null, message.getBytes());// 消费者代码片段
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs_exchange", "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
3、AMQP消息只被消费一次
- 1、合理配置消息队列ACK机制
大多数消息队列都提供了手动确认(ACK)的功能,允许消费者成功处理后,主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
- 2、合理配置消息队列预取数量
防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);
- 3、消费者幂等性设计
针对消息全局唯一的ID,入库,每次收到消息时先检查是否已入库
确保同一条消息多次处理的结果是一致的,避免重复的消息执行两次结果不一致
增加补偿机制,比如退款,退积分等概念的操作 - 4、分布式锁
借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理,只有获取到锁的消息可以处理,其他的放弃或等待。 - 5、监控告警机制
监控消息队列服务健康情况,针对可能重复消费的消息及时告警到服务负责人介入处理。 - 6、事务性消息
指的是消息和业务操作,一起成功或一起失败的机制。
(1)本地事务+补偿机制
(2)二阶段提交
引入协调者和参与者的概念。
客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出提交事务命令,否则全部回滚。每个参与者返回ack结果,协调者汇总执行结果,释放占用的资源。
(3)三阶段提交
针对二阶段提交完善事务性消息机制。
首先客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出预执行事务命令。各参与者收到命令,执行事务,但不提交。并返回ack,等待最终命令。协调者收到全部准备好,则发出提交事务命令。
4、AMQP 消息顺序消费
- 单实例独占队列,可以保证顺序消费,但是分布式高可用场景一般都是多实例部署,独占队列无法解决消息顺序消费的问题。
- 为了保证顺序消费,通常建议针对预取消息数量Prefetch Count设置为1:
channel.basicQos(1);
- 可以使用分布式锁确保消息消费是同步操作,并发安全,在成功处理消息后,手动发送ack确认到消息代理。
- 另外使用幂等性设计来避免重复消费。
- 增加补偿机制来处理幂等性设计无法保证的场景,比如退款等操作
- 增加监控告警到服务负责人。
- 可以对消息根据业务类型或特定的前缀规则,将不同的消息分到不同的分区或队列中,每个队列和分区内部是遵循先进先出规则来保证顺序消费的。
5、AMQP消息可靠性
- 事务支持
允许一组操作作为一个整体提交或回滚。 - ACK确认机制
当消息成功投递后,接收方会向发送方发送 ACK 确认;如果发生错误,则发送 NACK 拒绝。 - 持久化选项
可以选择是否将消息保存到磁盘上,以防服务中断时丢失重要数据。
6、RabbitMQ配置ACK
1>rabbitmq.conf或rabbitmq.ini开启配置
# 启用自动恢复功能,确保在网络中断后能够自动重连
connection.cached = true
# 设置心跳检测间隔,防止长时间无通信导致连接断开
heartbeat = 60
# 启用 Publisher Confirms,允许生产者收到消息确认
publisher_confirms = on
2>消费者手动确认
声明队列,确保队列存在
设置预取计数,限制每次从队列中拉取的消息数量为 1,以避免过载
开启手动确认模式,通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认,改为手动确认
发送 ACK 确认,在成功处理完消息后,调用 channel.basicAck 方法发送确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQConsumer {private final static String QUEUE_NAME = "task_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列,确保它存在channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置预取计数为 1,确保每次只处理一条消息channel.basicQos(1);// 开启手动确认模式DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 模拟任务处理时间Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();} finally {System.out.println(" [x] Done");// 手动发送 ACK 确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};// 开始消费消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}}
}
3>配置 Publisher Confirms和Transaction
允许生产者在发送消息后等待消息代理的确认
// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}
Channel channel = connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}// 开启事务模式
channel.txSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());channel.txCommit();
} catch (Exception e) {channel.txRollback();
}
7、RabbitMQ配置协议
1>rabbitmq.conf
RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。
支持启用SSL认证提高安全性。
支持设置心跳保证客户端和服务端连接保持活跃。
# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default = 5672
# 确保 AMQP 插件已启用,AMQP 0-9-1 是默认启用的
enabled_plugins = [rabbitmq_amqp1_0]# 启用 SSL/TLS 支持
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/private_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 设置 SSL/TLS 监听端口
listeners.ssl.default = 5671# 设置心跳间隔时间为 60 秒
heartbeat = 60
8、RabbitMQ消息持久化
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PersistentExample {private final static String QUEUE_NAME = "persistent_queue";private final static String EXCHANGE_NAME = "persistent_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 创建持久化的交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);// 创建持久化的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换器channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");// 发送持久化的消息String message = "Persistent message!";AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2 表示持久化.build();channel.basicPublish(EXCHANGE_NAME, "routing_key", props, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
- 1、持久化队列
channel.queueDeclare("durable_queue", true, false, false, null);
- 2、交换器持久化
确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定,以避免消息丢失
channel.exchangeDeclare("durable_exchange", "direct", true);
- 3、消息持久化
delivery_mode 参数:设置为 2 表示持久化消息;设置为 1(默认)则表示非持久化消息
channel.basicPublish("exchange_name", "routing_key", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());
9、RabbitMQ自动重连
网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
10、RabbitMQ组件
组件 | 名称 | 说明 |
---|---|---|
Producer | 生产者 | 负责生成并发送消息的应用程序。 |
Consumer | 消费者 | 负责接收并处理消息的应用程序。 |
Message | 消息 | 承载业务数据的基本单元,包含消息体(Body)、属性(Properties)等信息。 |
Exchange | 交换机 | 用于接收来自生产者的消息,并根据路由规则将其分发到一个或多个队列中。 |
Queue | 队列 | 存储待处理消息的地方,消费者从中拉取消息进行处理。 |
Binding | 绑定 | 定义了交换机和队列之间的关系,包括路由键等参数。 |
Virtual Host | 虚拟主机 | 类似于命名空间的概念,用于隔离不同的应用环境,每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。 |
11、RabbitMQ核心组件交换器和路由键
交换器(Exchange)和路由键(Routing Key)是消息传递系统的核心组件,它们共同决定了消息如何从生产者传递到正确的队列。
消息提供方生产消息,根据预定规则,路由至匹配的一个或多个队列。
消息创建时设定路由键,消息发布到交换器时,通过队列路由键,把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。
若队列至少有一个消费者订阅,消息将以轮询方式发给消费者。
交换器 | 说明 | 应用场景 |
---|---|---|
Direct | 精确匹配路由键 | 只有当路由键完全匹配时,消息才会被发送到对应的队列。适用于一对一的消息分发。 |
Topic | 基于通配符模式匹配路由键 | 适用于灵活的消息过滤和多条件匹配。 |
Fanout | 广播所有消息给所有绑定的队列 | 适用于需要将相同消息发送给多个消费者的场景。 |
Headers | 根据消息头属性进行路由 | 适用于复杂的消息路由需求,例如根据多个字段组合来决定消息去向。 |
1>Direct Exchange 精准匹配路由键交换器
根据路由键完全匹配队列,如果找不到匹配的队列,则消息会被丢弃。
- 生产者
// 创建 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");// 绑定队列到 Exchange,并指定 Binding Key
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "info");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "error");// 发送消息时指定 Routing Key
channel.basicPublish("direct_logs", "info", null, "Info log message".getBytes());
- 消费者
import com.rabbitmq.client.*;public class DirectConsumer {private final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Binding Keyif (argv.length < 1) {System.err.println("Usage: DirectConsumer [info] [warning] [error]");System.exit(1);}for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
2>Fanout Exchange广播交换器
广播所有消息给所有绑定的队列
- 生产者
// 创建 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");// 绑定队列到 Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");// 发送消息时不指定 Routing Key
channel.basicPublish("logs", "", null, "Broadcast log message".getBytes());
- 消费者
import com.rabbitmq.client.*;public class FanoutConsumer {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
3>Topic Exchange 通配符路由器
*:匹配一个单词;#:匹配零个或多个单词
- 生产者
// 创建 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");// 绑定队列到 Exchange,并指定 Binding Key 模式
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");// 发送消息时指定符合模式的 Routing Key
channel.basicPublish("topic_logs", "quick.orange.rabbit", null, "Quick orange rabbit".getBytes());
- 消费者
import com.rabbitmq.client.*;public class TopicConsumer {private final static String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Binding Key 模式if (argv.length < 1) {System.err.println("Usage: TopicConsumer [binding_key_pattern]");System.exit(1);}for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
4>Headers Exchange 根据消息头属性进行路由
不依赖路由键,当消息的 headers 完全匹配时,才会将消息发送到对应的队列。
- 生产者
// 创建 Headers Exchange
channel.exchangeDeclare("headers_exchange", "headers");// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, "headers_exchange", "", headers);// 发送带有 Headers 的消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish("headers_exchange", "", props, "Message with specific headers".getBytes());
- 消费者
import com.rabbitmq.client.*;public class HeadersConsumer {private final static String EXCHANGE_NAME = "headers_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "headers");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Headers 匹配规则Map<String, Object> headers = new HashMap<>();headers.put("user_id", "12345");headers.put("order_status", "pending");AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, EXCHANGE_NAME, "", headers);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
12、RabbitMQ核心方法及参数说明
1>newConnection 创建连接工程并开启连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
2>createChannel 创建信道
RabbitMQ 使用信道的方式来传输数据
信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接可以创建多个信道,每个信道都是独立的通信线路,可以并发地发送和接收消息。
Channel channel = connection.createChannel();
3>exchangeDeclare 交换器声明
channel.exchangeDeclare("my_exchange", "direct", true, false, null);
exchange: 交换器名称。
type: 交换器类型(如 “direct”, “fanout”, “topic”, “headers”)。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
autoDelete: 自动删除标志,true 表示当最后一个队列断开时自动删除交换器。
internal: 内部交换器标志,true 表示该交换器只能被其他交换器使用,不能直接由生产者发布消息。
arguments: 其他可选参数,例如死信交换器、过期时间等。
4>queueDeclare 队列声明
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
queue: 队列名称,为空字符串时表示创建临时队列。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
exclusive: 排他性标志,true 表示仅当前连接可用,连接关闭后自动删除。
autoDelete: 自动删除标志,true 表示当最后一个消费者断开时自动删除队列。
arguments: 其他可选参数,例如死信队列、过期时间等。
5>queueBind 队列绑定
将队列绑定到指定的交换器上,并提供路由键或匹配规则
channel.queueBind(queueName, "my_exchange", "routing_key");
queue: 队列名称。
exchange: 交换器名称。
routingKey: 路由键,对于某些类型的交换器(如 Direct 或 Topic),这个值是必须的;对于 Fanout 类型,通常为空字符串。
arguments: 可选参数,主要用于 Headers Exchange 的匹配规则
6>basicPublish 发布消息
向指定的交换器发布一条消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2) // 2 表示持久化.build();
channel.basicPublish("my_exchange", "routing_key", props, message.getBytes());
exchange: 交换器名称。
routingKey: 路由键。
props: 消息属性,包括内容类型、编码、持久化模式等。
body: 消息体,即要发送的数据。
7>basicConsume 消费消息
费来自指定队列的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
queue: 队列名称。
autoAck: 自动确认标志,true 表示收到消息后立即确认,false 表示手动确认。
deliverCallback: 回调函数,用于处理接收到的消息。
cancelCallback: 取消回调函数,当消费者的取消通知到达时调用
8>basicAck 消息确认
手动确认模式下,当消费者成功处理完消息后,需要调用此方法来确认消息已被消费
channel.basicAck(envelope.getDeliveryTag(), false);
9>basicNack 消息丢弃
当消费者无法处理某条消息时,可以拒绝这条消息,并决定是否重新入队或者丢弃
// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
13、RabbitMQ镜像集群模式
搭建RabbitMQ保证消息队列的高可用。
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue。
优点:高可用,单个节点挂掉,其他节点仍可用
缺点:高负载,如果某个队列消息很重,则镜像复制的实例下也会很重,性能开销大。
参考博客:消息队列中点对点与发布订阅区别
Powered by niaonao
相关文章:
掌握RabbitMQ:全面知识点汇总与实践指南
前言 RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。 特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。 作用:服务间异步通信;顺序消费;定时任务;请求削…...
Ansys Discovery 优化模式:掌握网格划分方法
本篇博客文章介绍了 Ansys Discovery 中高级 CFD 仿真的 Refine 模式下可用的网格划分方法。上一篇文章讨论了 Explore 模式中可用的网格划分技术。 Refine 模式下的高级网格划分技术 使用 Ansys Discovery 时,Refine 模式提供的网格划分技术比 Explore 模式多。这…...
CentOS: RPM安装、YUM安装、编译安装(详细解释+实例分析!!!)
目录 1.什么是RPM 1.1 RPM软件包命名格式 1.2RPM功能 1.3查询已安装的软件:rpm -q 查询已安装软件的信息 1.4 挂载:使用硬件(光驱 硬盘 u盘等)的方法(重点!!!) 1…...
(转)rabbitmq怎么保证消息不丢失?
RabbitMQ 可以通过以下多种机制来保证消息不丢失: 生产阶段 - 持久化队列和交换器: - 在声明队列和交换器时,将 durable 参数设置为 true ,确保它们是持久化的。这样,即使 RabbitMQ 节点重新启动,队列和交…...
List ---- 模拟实现LIST功能的发现
目录 listlist概念 list 中的迭代器list迭代器知识const迭代器写法list访问自定义类型 附录代码 list list概念 list是可以在常数范围内在任意位置进行插入和删除的序列式容器,并且该容器可以前后双向迭代。list的底层是双向链表结构,双向链表中每个元素…...
电力场景输电线覆冰分割数据集labelme格式1227张2类别
数据集格式:labelme格式(不包含mask文件,仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数):1227 标注数量(json文件个数):1227 标注类别数:2 标注类别名称:["ice","powerline"] 每个…...
springboot 日志实现
日志 日志框架可以分为 日志门面(Facade) 和 日志实现(Implementation),Spring Boot 使用了 SLF4J 作为日志门面,Logback 或 Log4j2 作为日志实现。 日志门面以及日志实现 日志门面 日志门面࿰…...
Ubuntu Server安装谷歌浏览器
背景 服务器上跑爬虫服务器需要安装谷歌浏览器 安装 wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb### sudo apt install ./google-chrome-stable_current_amd64.deb...
毕业项目推荐:基于yolov8/yolov5/yolo11的动物检测识别系统(python+卷积神经网络)
文章目录 概要一、整体资源介绍技术要点功能展示:功能1 支持单张图片识别功能2 支持遍历文件夹识别功能3 支持识别视频文件功能4 支持摄像头识别功能5 支持结果文件导出(xls格式)功能6 支持切换检测到的目标查看 二、数据集三、算法介绍1. YO…...
计算机网路HTTP、TCP详解
HTTP HTTP基本概念 HTTP(超文本传输协议):HTTP是在计算机世界中两点之间传输文字、图片、视频等超文本内容数据的约束与规范。 常见状态码: 2xx:报文被收到,已经在正确处理中。 3xx:重定向…...
小程序未来趋势预测:技术革新与市场前景
一、引言 在数字化转型加速的时代,小程序作为一种轻量级、无需下载安装的应用形式,凭借其快速开发、易于使用的特点,迅速崛起并成为企业和开发者拓展业务的重要工具。随着移动互联网的进一步普及和用户对便捷应用体验的需求增长,…...
JavaEE初阶——计算机工作原理
一、什么是JavaEE JavaEE(Java Platform,Enterprise Edition)是sun公司(2009年4月20日甲骨文将其收购)推出的企业级应用程序版本。这个版本以前称为 J2EE。能够帮助我们开发和部署可移植、健壮、可伸缩且安全的服务器…...
Web前端ui框架
文章目录 Element简介 Ant Design vue简介关键特点: iview简介 Element 官网:https://element.eleme.cn/#/zh-CN/component/installation添加链接描述 简介 Elementui 一套为开发者、设计师和产品经理准备的基于 Vue 2.0 的桌面端组件库 Ant Design …...
炸弹 (boom.c)
炸弹 (boom.c) 时间限制: 800ms 内存限制: 256000KiB 进度: 57/12406 0.5% 题目描述 出题助教: Sakiyary 验题助教: Corax、XiEn、ErinwithBMQ、runz、MacGuffin、Bob 维多利亚的腐烂荒野上出现了 N 个魔物,你和小维需要抓紧时间调配炸弹对付它们。 荒野可以视…...
【C语言】可移植性陷阱与缺陷(八): 随机数的大小
在C语言编程中,随机数的生成和使用是一个常见的需求。然而,由于不同平台上的C标准库实现可能存在差异,随机数的生成和使用也可能面临可移植性问题。本文将深入探讨C语言中随机数的大小与可移植性相关的陷阱与缺陷,并提供相应的解决…...
SSL VPN
SSL VPN 是什么? 是采用SSL/TLS协议来实现远程接入的一种轻量级 VPN 技术。利用 SSL 协议提供的基于证书的身份认证、数据加密和消息完整性验证机制,可以为应用层之间的通信建立安全连接。因为 SSL 协议内置于浏览器中,使用 SSL VPN 可以免于…...
C语言:函数
目录 1.函数的解释 2.C语言函数的分类 2.1库函数 2.2自定义函数 2.2.1自定义函数的语法格式 2.2.2自定义函数的实践 (1)第一题:欢迎光临 (2)第二题 打印数字的平方 编辑 (3)第三题 计算和 3.函数的参数 3.1值传递的表现 --- 形参 3.2地址传递的表现 -…...
Vue 环境配置与项目创建指南
1. 前置要求 在开始配置 Vue 开发环境之前,需要确保以下工具已安装: Node.js:Vue 的构建工具依赖 Node.js。 npm 或 yarn:Node.js 自带 npm 包管理工具,也可以选择安装 yarn。 安装 Node.js 前往 Node.js 官网 下…...
关于物联网的基础知识(三)——物联网技术架构:连接万物的智慧之道!连接未来的万物之网!
成长路上不孤单😊😊😊😊😊😊 【14后😊///计算机爱好者😊///持续分享所学😊///如有需要欢迎收藏转发///😊】 今日分享关于物联网的基础知识(三&a…...
iOS - Tagged Pointer
1. 基本结构 // Tagged Pointer 的内存布局 union TaggedPointer {uintptr_t bits; // 完整的指针值struct {uintptr_t data : 60; // 数据部分uintptr_t tag : 4; // 类型标记};// 扩展类型struct {uintptr_t extData : 52; // 扩展数据uintptr_t extTag : …...
基于SpringBoot的音乐网站与分享平台
基于SpringBoot的音乐网站与分享平台 摘要1. 研究背景2.研究内容3.系统功能 3.1前台首页功能模块3.2在线听歌功能模块3.3后台登录功能模块3.4在线听歌管理模块 4.部分功能代码实现5.源码分享(免费获取) 需要源码联系我即可(免费获取)~ ??大家点赞、收藏、关注、评论啦 、查…...
【机器学习篇】 科技异次元的超强 “魔杖”,开启奇幻新程
一起开启这场旅行吧,关注博主,点赞支持不迷路,下面一同欣赏本篇的美妙吧!! ! 博主主页: 羑悻的小杀马特.-CSDN博客 在当今科技飞速发展的时代,机器学习宛如一把来自科技异次元的超强…...
opencv CV_TM_SQDIFF未定义标识符
opencv CV_TM_SQDIFF未定义标识符 opencv4部分命名发生变换,将CV_WINDOW_AUTOSIZE改为WINDOW_AUTOSIZE;CV_TM_SQDIFF_NORMED改为TM_SQDIFF_NORMED。...
OneFlow的简单介绍
OneFlow 是北京一流科技有限公司旗下的采用全新架构设计的开源工业级通用深度学习框架。以下是关于 OneFlow 的详细介绍: 本篇文章的目录 特点 功能 应用场景 发展历程 特点 简洁易用的接口:为深度学习相关的算法工程师提供一套简洁易用的用户接口…...
如何配置【Docker镜像】加速器+【Docker镜像】的使用
一、配置Docker镜像加速器 1. 安装/升级容器引擎客户端 推荐安装1.11.2以上版本的容器引擎客户端 2. 配置镜像加速器 针对容器引擎客户端版本大于1.11.2的用户 以root用户登录容器引擎所在的虚拟机 修改 "/etc/docker/daemon.json" 文件(如果没有…...
《OpenCV》——模版匹配
文章目录 什么是模版匹配?函数介绍实例 什么是模版匹配? 模板匹配是在一幅图像中寻找与另一幅模板图像最匹配部分的技术。OpenCV 提供了多种模板匹配的方法,它在目标检测、物体识别等众多计算机视觉任务中有广泛的应用。例如,你有…...
【网络安全技术与应用】(选修)实验3 网络侦察
一、实验目的 培养学生综合运用搜索引擎、Whois数据库等手段对目标站点进行侦查的能力,了解站点查询常用的信息源及搜索工具,熟练掌握常见搜索工具的功能及使用技巧。培养学生使用搜索引擎在互联网查找特定设备的能力,熟悉联网设备搜索引擎的功能,熟练掌握设备搜索引擎的使…...
基于XGBoost的集成学习算法
目录 一、XGBoost原理1.1 提升方法(Boosting)1.2 提升决策树 (BDT)1.3 梯度提升决策树 (GBDT)1.4 极限梯度提升(XGBoost)1.4.1 XGBoost改进1.4.2 XGBoostcsklearn实现1.4.3 XGBoost回…...
【84键矮轴无线键盘】个人使用经历
推荐: 前行者MK84键(理由:价格实惠,键位布局好,其他都是可接受程度)K3 max(理由:除了短命的续航、脚垫容易脱落,已无明显短板) 我需要一把在小巧、舒适的主力…...
基于Arduino的FPV头部追踪相机系统
构建FPV头部追踪相机:让你置身于遥控车辆之中! 在遥控车辆和模型飞行器的世界中,第一人称视角(FPV)体验一直是爱好者们追求的目标。通过FPV头部追踪相机,你可以像坐在车辆或飞行器内部一样,自由…...
Flutter:邀请海报,Widget转图片,保存相册
记录下,把页面红色区域内的内容,转成图片后保存到相册的功能 依赖 # 生成二维码 qr_flutter: ^4.1.0 # 保存图片 image_gallery_saver_plus: ^3.0.5view import package:demo/common/index.dart; import package:ducafe_ui_core/ducafe_ui_core.dart; i…...
CSS——16. nth—child序列选择器1
<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>nth-child选择器</title><style type"text/css">/* 选择的是li的父元素(ul)中的第9个li子元素 */li:nth-child(9){color: red…...
【HarmonyOS 5.0】从0到1开发购物应用App(二):登录页对接口
【HarmonyOS Arkts笔记】http网络请求封装 【HarmonyOS Arkts笔记】ohos.data.preferences用户首选项实现存储信息 登录页 点击登录按钮调用login()方法 import { promptAction, router } from kit.ArkUI; import loginApi from "../../api/login" import Prefere…...
asp.net core webapi中的数据注解与数据验证
在这一课中,主要讲解了如何在 Web API 中使用数据注解(Data Annotations)和进行数据验证,以确保请求数据的有效性和完整性。 在 Web API 中,数据验证是确保客户端传递的数据符合业务规则和格式要求的关键步骤。数据注…...
VulnHub—potato-suncs
使用命令扫描靶机ip arp-scan -l 尝试访问一下ip 发现一个大土豆没什么用 尝试扫描一下子域名 没有发现什么有用的信息 尝试扫描端口 namp -A 192.168.19.137 -p- 尝试访问一下端口,发现都访问不进去 查看源代码发现了网页的标题 potato,就想着爆破一下密码 hydr…...
uniapp 微信小程序 自定义日历组件
效果图 功能:可以记录当天是否有某些任务或者某些记录 具体使用: 子组件代码 <template><view class"Accumulate"><view class"bx"><view class"bxx"><view class"plank"><…...
云架构Web端的工业MES系统设计之区分工业过程
云架构Web端的工业MES系统设计之区分工业过程 在当今数字化浪潮席卷全球的背景下,制造业作为国家经济发展的重要支柱产业,正面临着前所未有的机遇与挑战。市场需求的快速变化、客户个性化定制要求的日益提高以及全球竞争的愈发激烈,都促使制造企业必须寻求更加高效、智能的生产…...
Harbor 仓库部署(docker-compose 部署方式)
一、 安装的前提条件 硬件 资源 最低 推荐 cpu2C4C内存4G8G硬盘40G160G 软件 软件 版本 描述 dockerv17.0.6-ce 安装参考官方文档 Install Docker Engine | Docker Documentation docker-composev1.18.0 安装参考官方文档 Overview | Docker Documentation Openssllatest…...
机器学习基础-支持向量机SVM
目录 基本概念和定义 1. 超平面(Hyperplane) 2. 支持向量(Support Vectors) 3. 线性可分 4. 边界 SVM算法基本思想和分类 基本思想 间隔最大化 间隔(Margin) 软边距 SVM 核函数的概念 基本概念…...
卸载wps后word图标没有变成白纸恢复
这几天下载了个wps教育版,后头用完了删了 用习惯的2019图标 给兄弟我干没了??? 其他老哥说什么卸载关联重新下 ,而且还要什么撤销保存原来的备份什么,兄弟也是不得不怂了 后头就发现了这个半宝藏博主&…...
mongodb==安装prisma连接
官网下载mongodb,解压安装 Download MongoDB Community Server | MongoDB 修改bin/mongod.cfg # mongod.conf# for documentation of all options, see: # http://docs.mongodb.org/manual/reference/configuration-options/# Where and how to store data. storage:dbPat…...
python代码实现了一个金融数据处理和分析的功能,主要围绕国债期货及相关指数数据展开
# 忽略某些模块的提示信息 import warnings warnings.filterwarnings("ignore") # 在全局配置中添加RQData账号信息 import rqdatac as rq from typing import List import pandas as pd import numpy as np import re from datetime import datetime, timedelta,tim…...
声音是如何产生的
一、音频概述 RTMP中一般音频采用aac编码,采样率为44100HZ, 每帧1024采样,帧率43,23.2ms一帧 RTC中一般音频采用opus编码,采样率为48000HZ,每帧480采样,帧率100,10ms一帧 通道数(c…...
Matlab回归预测大合集(不定期更新)-188
截至2025-1-2更新 1.BP神经网络多元回归预测(多输入单输出) 2.RBF神经网络多元回归预测(多输入单输出) 3.RF随机森林多元回归预测(多输入单输出) 4.CNN卷积神经网络多元回归预测(多输入单输…...
CUDA编程【7】 线程束
文章目录 线程束和线程块线程束线程块 线程束的分化问题线程束分化(Warp Divergence)线程束分化的执行机制如何避免线程束的分化 线程束和线程块 线程束 线程束是SM中基本的执行单元当一个网格被启动(即一个核函数被启动)&#…...
nodejs:nodejs的技巧有哪些(2)
Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境,它允许开发者构建高性能的网络应用。 1. 使用异步编程,利用 async/await 来处理异步操作,使代码更清晰、易读。 const fetchData async () > { const data await getDataFrom…...
构建数字化校园:定义与意义
随着信息技术的快速发展,"数字化校园"这一概念逐渐成为教育领域内热议的话题。数字化校园是指利用先进的信息技术手段,如互联网、大数据、云计算等,对学校的教学、科研、管理和服务等方面进行全面升级和优化的过程。它不仅改变了传…...
LabVIEW语言学习过程是什么?
学习LabVIEW语言的过程可以分为几个阶段,每个阶段的重点内容逐步加深,帮助你从入门到精通。以下是一个简洁的学习过程: 1. 基础入门阶段 理解图形化编程:LabVIEW是一种图形化编程语言,与传统的文本编程语言不同&am…...
阿里云 人工智能与机器学习
阿里云的 人工智能(AI)与机器学习(ML) 服务为企业提供了全面的AI解决方案,帮助用户在多个行业实现数据智能化,提升决策效率,推动业务创新。阿里云通过先进的技术和丰富的工具,支持用…...
NUTTX移植到STM32
STM32移植NUTTX 1. Ubuntu下搭建开发环境1.1 先决条件1.2 下载 NuttX1.3 使用Make 进行编译1.4 烧录运行 2.通过NUTTX点亮LED2.1 部署操作系统2.2 修改配置文件2.3 编译运行程序 开发板:DshanMCUF407 官方开发文档:安装 — NuttX latest 文档 参考文档&…...