当前位置: 首页 > news >正文

RabbitMQ基础篇

文章目录

  • 1 RabbitMQ概述
    • 1.1 消息队列
    • 1.2 RabbitMQ体系结构
  • 2 RabbitMQ工作模式
    • 2.1 简单模式(Simple Queue)
    • 2.2 工作队列模式(Work Queues)
    • 2.3 发布/订阅模式(Publish/Subscribe)
    • 2.4 路由模式(Routing)
    • 2.5 通配符模式(Topics)
  • 3 RabbitMQ整合SpringBoot
    • 3.1 @RabbitListener注解属性
    • 3.2 消费者工程
    • 3.3 生产者工程
  • 4 消息可靠性投递
    • 4.1 什么是消息可靠投递?
    • 4.2 消息的可靠发送
      • 4.2.1 消息确认机制
        • ①模块准备
        • ②配置类说明
        • ③配置类示例
        • ④测试代码
      • 4.2.2 备用交换机
        • ①备用交换机配置
        • ②备用交换机测试
    • 4.3 消息的可靠存储
      • 4.3.1 非持久化交换机和队列
      • 4.3.2 持久化交换机和消息队列
        • ①@Queue注解分析
        • ②@Exchange注解分析
    • 4.4 消息的可靠消费
      • 4.4.1 模块准备
      • 4.4.2 手动确认思路
        • ①basicAck()方法
        • ②basicNack()方法
        • ③basicReject()方法
      • 4.4.3 可靠消费代码
    • 4.5 消息可靠性投递架构
  • 5 消费端限流
    • 5.1 未设置prefetch
    • 5.2 设置prefetch
  • 6 消息超时
    • 6.1 队列层面设置过期时间
    • 6.2 消息层面设置过期时间
  • 7 死信和死信队列
    • 7.1 准备工作
      • 7.1.1 正常交换机和正常消息队列
      • 7.1.2 死信交换机和死信队列
      • 7.1.3 常量声明
    • 7.2 死信--拒绝
    • 7.3 死信--超时和溢出
  • 8 延时队列
    • 8.2 延迟插件的使用
      • 8.2.1 生产者端
      • 8.2.2 消费者端
        • ① ui界面创建延迟交换机和队列
        • ②代码创建延迟交换机和队列
      • 8.2.3 效果展示
  • 9 事务消息
    • 9.1 什么是事务消息?
    • 9.2 Springboot发送事务消息
      • 9.2.1 准备工作
      • 9.2.2 没有事务消息
      • 9.2.3 使用事务消息
    • 9.3 Channel发送事务消息
  • 10 惰性队列
    • 10.1 队列策略设定
    • 10.3 `queue.declare`参数设定
  • 11 优先级队列
    • 11.1 准备工作
    • 11.2 使用优先级队列

1 RabbitMQ概述

RabbitMQ简易安装教程

# 拉取镜像
docker pull rabbitmq:3.13-management# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-managementdocker exec -it 5129c41ad3d8 /bin/bash # 数字是rabbitmq的id,可以通过docker ps查看rabbitmq-plugins enable rabbitmq_management #启用 RabbitMQ Management 插件,使得你可以轻松地监控和管理 RabbitMQ 服务器

访问登录:http://192.168.145.130:15672,账号密码就是上面指定的

1.1 消息队列

消息队列是实现应用程序之间通信的中间件

消息队列的好处

  • 消息的发送者和接收者进行异步通信
  • 流量高峰保证服务稳定,消息队列可以暂存大量消息,达到流量削峰
  • 扩展性高,可以水平扩展以支持更多的发送者和接收者,相应地增加或减少资源处理(功能处理)
  • 解耦:消息的发送者和接收者只专注于消息,无需关系彼此细节

主流MQ对比

image-20240526195043655

1.2 RabbitMQ体系结构

image-20240527160657626
  • Channel(信道):信道是生产者消费者和RabbitMQ服务器之间通信的桥梁。所有的消息发布和消费都由信道来完成的

    • 建立在TCP连接上的虚拟连接,允许在单个TCP连接上建立多个信道,从而实现多线程处理
    • 每个线程对应一个信道,信道在RabbitMQ中具有唯一的ID,保证了信道的私有性
    • 引入信道的概念是为了减少建立和销毁TCP连接的开销,提高系统性能
  • Exchange(交换机):负责接收消息并根据路由键将消息转发到绑定的队列

  • Queue(队列):队列是RabbitMQ中用于存储消息的容器,消息按照先进先出的顺序进行处理

  • Virtual Host(虚拟主机):是RabbitMQ中的命名空间(理解为分组),用于隔离不同的环境或应用程序。每个虚拟主机都有自己的队列、交换机和绑定关系

  • Broker(代理服务器):指RabbitMQ服务器本身,多个Broker组合成一个RabbitMQ集群

2 RabbitMQ工作模式

image-20240527195355740

  • 简单模式:生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
  • 工作队列模式:生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
  • 发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列
  • 消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
  • 消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列

项目导入依赖:采用原生的方式,开发中都是集成框架的

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
</dependencies>

封装连接工具类:

import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class ConnectionUtil {  public static final String HOST_ADDRESS = "192.168.145.160";public static Connection getConnection() throws Exception {  // 定义连接工厂  ConnectionFactory factory = new ConnectionFactory();// 设置服务地址  factory.setHost(HOST_ADDRESS);// 端口  factory.setPort(5672);//设置账号信息,用户名、密码、vhost  factory.setVirtualHost("/");  factory.setUsername("guest");  factory.setPassword("123456");// 通过工程获取连接  Connection connection = factory.newConnection();return connection;  }
}

2.1 简单模式(Simple Queue)

生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息

image-20240527195355740

生产者:发送消息

public class Producer {  public static void main(String[] args) throws Exception {// 获取连接Connection connection = ConnectionUtil.getConnection();// 创建频道  Channel channel = connection.createChannel();  // 声明(创建)队列  // queue      参数1:队列名称  // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  // arguments  参数5:队列其它参数  channel.queueDeclare("simple_queue", true, false, false, null);  // 要发送的信息  String message = "你好;小兔子!";  // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  // 参数2:路由key,简单模式可以传递队列名称  // 参数3:配置信息  // 参数4:消息内容  channel.basicPublish("", "simple_queue", null, message.getBytes());  System.out.println("已发送消息:" + message);  // 关闭资源  channel.close();  connection.close();}
}

运行效果:新增队列:simple_queue

image-20240527200148062

image-20240527200606545

消费者

public class Consumer {  public static void main(String[] args) throws Exception {// 获取连接Connection connection = ConnectionUtil.getConnection();// 创建ChannelChannel channel = connection.createChannel();  // 创建队列// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  // 参数1. queue:队列名称  // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  // 参数3. exclusive:是否独占。  // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  // 参数5. arguments:其它参数。  channel.queueDeclare("simple_queue",true,false,false,null);  // 接收消息  DefaultConsumer consumer = new DefaultConsumer(channel){// 回调方法,当收到消息后,会自动执行该方法  // 参数1. consumerTag:标识  // 参数2. envelope:获取一些信息,交换机,路由key...  // 参数3. properties:配置信息  // 参数4. body:数据  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);  System.out.println("Exchange:"+envelope.getExchange());  System.out.println("RoutingKey:"+envelope.getRoutingKey());  System.out.println("properties:"+properties);  System.out.println("body:"+new String(body));}};// 参数1. queue:队列名称  // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  // 参数3. callback:回调对象  // 消费者类似一个监听程序,主要是用来监听消息  channel.basicConsume("simple_queue",true,consumer);}
}

控制台打印:

image-20240527202647372

消息被消费掉了,所以RabbitMQ服务器上没有了

image-20240527201201641

2.2 工作队列模式(Work Queues)

生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)

生产者

public class Producer {  public static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  for (int i = 1; i <= 10; i++) {String body = i+"hello rabbitmq~~~";channel.basicPublish("",QUEUE_NAME,null,body.getBytes());}  channel.close();connection.close();}
}

发送消息:

image-20240527204259210

消费者1:

public class Consumer1 {  static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

消费者2:

public class Consumer2 {static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序

运行结果:两个消费者竞争消息队列中消息

image-20240527204854307

2.3 发布/订阅模式(Publish/Subscribe)

rabbitmq消息通讯过程:消息生产者将消息发送给交换机,由交换机处理消息。Exchange(交换机)只负责转发消息,不存储消息,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

常见的交换机类型

  • Fanout Exchange(扇出交换机),将消息发送给所有绑定到交换机的队列
  • Direct Exchange(直连交换机),把消息交给符合指定routing key的队列
  • Topic Exchange(主题交换机),把消息交给符合routing pattern(路由模式)的队列
  • Default Exchange(默认交换机),把消息发送给指定队列

发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列

生产者:

public class Producer {public static void main(String[] args) throws Exception {// 1、获取连接  Connection connection = ConnectionUtil.getConnection();// 2、创建频道  Channel channel = connection.createChannel();  // 参数1. exchange:交换机名称  // 参数2. type:交换机类型  //     DIRECT("direct"):定向  //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  //     TOPIC("topic"):通配符的方式  //     HEADERS("headers"):参数匹配  // 参数3. durable:是否持久化  // 参数4. autoDelete:自动删除  // 参数5. internal:内部使用。一般false  // 参数6. arguments:其它参数  String exchangeName = "test_fanout";  // 3、创建交换机  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  // 4、创建队列  String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 5、绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //     如果交换机的类型为fanout,routingKey设置为""  channel.queueBind(queue1Name,exchangeName,"");  channel.queueBind(queue2Name,exchangeName,"");  String body = "日志信息:张三调用了findAll方法...日志级别:info...";  // 6、发送消息  channel.basicPublish(exchangeName,"",null,body.getBytes());  // 7、释放资源  channel.close();  connection.close();}
}

消费者1:

public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name = "test_fanout_queue1";  channel.queueDeclare(queue1Name,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));  System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");}  };channel.basicConsume(queue1Name,true,consumer);}
}

消费者2

public class Consumer2 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));  System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");}};channel.basicConsume(queue2Name,true,consumer);}
}

先启动两个消费者,再启动生产者发送消息

image-20240527211334592

交换机和队列的绑定关系如下图所示:

image-20240527211442292

发布订阅模式与工作队列模式的区别:

  • 工作队列模式消息由默认交换机处理,发布订阅模式消息由指定交换机处理
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

2.4 路由模式(Routing)

消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)

当Direct交换机用相同的路由键routing key绑定多个队列,就会有广播效果(类似发布订阅)

生产者:

public class Producer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_direct";// 创建交换机  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);// 创建队列  String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";// 声明(创建)队列  channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);// 队列绑定交换机  // 队列1绑定error  channel.queueBind(queue1Name, exchangeName, "error");// 队列2绑定info error warning  channel.queueBind(queue2Name, exchangeName, "info");channel.queueBind(queue2Name, exchangeName, "error");channel.queueBind(queue2Name, exchangeName, "warning");String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";// 发送消息  channel.basicPublish(exchangeName, "warning", null, message.getBytes());System.out.println(message);// 释放资源  channel.close();connection.close();}
}

消费者1:

public class Consumer1 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name = "test_direct_queue1";channel.queueDeclare(queue1Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));  System.out.println("Consumer1 将日志信息打印到控制台.....");}};channel.basicConsume(queue1Name,true,consumer);}
}

消费者2:

public class Consumer2 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name = "test_direct_queue2";channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("Consumer2 将日志信息存储到数据库.....");}};channel.basicConsume(queue2Name,true,consumer);}
}

先启动两个消费者,再启动生产者

绑定关系:

image-20240527214607379

消费者2接受到消息,消费者1没有消息

image-20240527214651865

2.5 通配符模式(Topics)

消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列

(通配符规则:#:匹配零个或多个词,*:匹配一个词)

生产者:

public class Producer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);// 绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //      如果交换机的类型为fanout ,routingKey设置为""  // routing key 常用格式:系统的名称.日志的级别。  // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  channel.queueBind(queue1Name, exchangeName, "#.error");channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*");// 分别发送消息到队列:order.info、goods.info、goods.error  String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";channel.basicPublish(exchangeName, "order.info", null, body.getBytes());body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());channel.close();connection.close();}
}

消费者1监听消息队列1

public class Consumer1 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue1";channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

消费者2监听消息队列2

public class Consumer2 {  public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String QUEUE_NAME = "test_topic_queue2";channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){@Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}

先启动两个消费者,接着启动生产者发送消息

image-20240527215241465

3 RabbitMQ整合SpringBoot

项目基本四步骤基本步骤:建module,改POM,写YAML,主启动

3.1 @RabbitListener注解属性

  • bindings属性:指定交换机和队列之间的绑定关系,指定当前方法要监听的队列

    隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

  • queues属性

    @RabbitListener(queues = {QUEUE_LINZHUOWEI})
    
    • 作用:指定当前方法要监听的队列
    • 此时框架不会创建相关交换机和队列,必须提前创建好

3.2 消费者工程

  1. 建module:module06-boot-consumer

  2. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--引入web模块为了保证项目一直运行,持久监听消息队列消息--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  3. 写YAML

    spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /logging:level:com.linzhuowei.mq.listener.MyMessageListener: info
    
  4. 主启动:正常添加@SpringBootApplication

  5. 监听器:

    import lombok.extern.slf4j.Slf4j;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;@Component
    @Slf4j
    public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";  public static final String ROUTING_KEY = "order";  public static final String QUEUE_NAME  = "queue.order";  @RabbitListener(bindings = @QueueBinding(//队列信息value = @Queue(value = QUEUE_NAME, durable = "true"),//交换机信息exchange = @Exchange(value = EXCHANGE_DIRECT),//路由键信息,赋值为字符串数组key = {ROUTING_KEY}))public void processMessage(//对应消息数据本身,形参类型需要和发送消息的数据类型对应String data,//对应发送的消息本身,可以通过message获取消息数据,包括dataMessage message,//频道对象Channel channel) {log.info(data);}}
    

运行查看后台管理:

image-20241021163148733

如图:交换机exchange.direct.order通过order路由键绑定消息队列

题外话:

  • 使用@RabbitListenerbindings属性能绑定交换机和队列的关系并监听队列消息,如果RabbitMQ服务中没有交换机和队列,则会自动创建该队列
  • 使用@RabbitListenerqueues属性,监听指定消息队列

所以如果只是单纯监听消息队列,不考虑交换机和队列的创建以及绑定(因为这些创建操作可以在后台页面点击完成嘿嘿),消费者代码也可以这样写:

@RabbitListener(queues = {QUEUE_LINZHUOWEI})
public void processMessage(//对应消息数据本身,形参类型需要和发送消息的数据类型对应String data,//对应发送的消息本身,可以通过message获取消息数据,包括dataMessage message,//频道对象Channel channel) {log.info(data);
}

但建议还是写第一种

3.3 生产者工程

  1. 新建模块:module05-boot-producer

  2. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
    </dependencies>
    
  3. 写YAML

    spring: rabbitmq: host: 192.168.145.130port: 5672 username: guest password: 123456 virtual-host: /
    
  4. 主启动

    import org.springframework.boot.SpringApplication;  
    import org.springframework.boot.autoconfigure.SpringBootApplication;  @SpringBootApplication
    public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);  }
    }
    
  5. 测试程序

    @SpringBootTest  
    public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";  public static final String ROUTING_KEY = "order";@Autowired  private RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY,   "Hello linzhuowei");  }  
    }
    

4 消息可靠性投递

4.1 什么是消息可靠投递?

消息可靠投递是确保消息从生产者发送到消息队列,再从消息队列消费到消费者的过程中,不丢失消息或重复处理消息

消息可靠投递主要三个方面:

  1. 消息的可靠发送(生产者 -> 消息队列)
  2. 消息的可靠存储(消息队列内部存储)
  3. 消息的可靠消费(消息队列 -> 消费者)

下面分别说这三个部分

4.2 消息的可靠发送

通过消息发送回调接口或备用交换机保证消息从生产者成功发送到消息队列中

4.2.1 消息确认机制

应答确认+ 失败重试

生产者发送消息后等待消息队列的响应,确保消息成功送达,如果发送失败可以尝试重新发送

①模块准备
  1. 新建模块:module07-confirm-producer

  2. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  3. 主启动

  4. 写YAML:启用消息确认机制

    spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns: true # 队列的确认
    logging:level:com.linzhuowei.mq.config.MQProducerAckConfig: info
    
②配置类说明

通过配置类设置RabbitTemplate的回调接口,通过回调方法获取RabbitMQ服务器返回的确认信息,实现消息确认机制

代码实现过程:配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中

两个接口两个回调方法,是否发送到交换机和是否发送到消息队列

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate
  1. ConfirmCallback接口(RabbitTemplate内部的接口)

    /*** A callback for publisher confirmations.**/
    @FunctionalInterface
    public interface ConfirmCallback {/*** Confirmation callback.* @param correlationData correlation data for the callback.* @param ack true for ack, false for nack* @param cause An optional cause, for nack, when available, otherwise null.*/void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);}
    

    生产者端发送消息之后,回调confirm()方法

    • ack参数值为true:表示消息成功发送到了交换机
    • ack参数值为false:表示消息没有发送到交换机
  2. ReturnCallback接口(RabbitTemplate内部的接口)

    /*** A callback for returned messages.** @since 2.3*/
    @FunctionalInterface
    public interface ReturnsCallback {/*** Returned message callback.* @param returned the returned message and metadata.*/void returnedMessage(ReturnedMessage returned);}
    

    接口中的returnedMessage()方法仅在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名类型含义
messageorg.springframework.amqp.core.Message消息以及消息相关数据
replyCodeint应答码,类似于HTTP响应状态码
replyTextString应答码说明
exchangeString交换机名称
routingKeyString路由键名称
③配置类示例

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功!数据:" + correlationData);} else {log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);}}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息发送到消息队列失败...");log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}
}
④测试代码
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest  
public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowired  private RabbitTemplate rabbitTemplate;@Test  public void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY,   "Hello atguigu");  }  }

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确
  • 交换机正确、路由键不正确,无法发送到队列
  • 交换机不正确,无法发送到交换机

回顾交换机作用:接收消息并路由到消息队列

4.2.2 备用交换机

①备用交换机配置

当消息在队列中未被处理时(如消息过期、消息被拒绝或达到最大重试次数,无匹配队列等),这些消息就会转发到备用交换机

本次案例模拟交换机没有匹配的消息队列,消息转至备用交换机

  1. 首先创建备用交换机(扇出类型):

    exchange.direct.order.backup
    

    image-20241022174034510

    image-20241022174127199

  2. 创建备用消息队列

    queue.order.backup
    

    image-20241022174305118

    image-20241022174317509

  3. 将备用消息队列绑定备用交换机

    image-20241022174431672\

  4. 重新创建原交换机(置顶备用交换机)

    exchange.direct.order
    

    需要删除原来的直连交换机,重新创建直连交换机,并设置备用交换机

    exchange.direct.order.backup
    

    image-20241022175031949

    image-20241022175047422

  5. 原交换机绑定原队列

    image-20241022175155189

    image-20241022175207880

    image-20241022175218262

②备用交换机测试

消息发送端:

@SpringBootTest  
public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";//exchange.direct.order交换机绑定的路由键为order,这里order路由键错误,会转到备用交换机public static final String ROUTING_KEY = "order1";@Autowired  private RabbitTemplate rabbitTemplate;@Test  public void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY+"11","Hello 备用交换机");}  }

结果:消息发送成功,消息先发往直连交换机exchange.direct.order,由于路由键无效,没有匹配的消息队列,所以消息发往备用的扇出交换机exchange.direct.order.backup,最终发送到消息队列queue.test.backup

4.3 消息的可靠存储

通过将消息持久化到硬盘上防止消息队列宕机导致内存中消息丢失(交换机默认持久化,消息队列有指定也是默认持久化)

4.3.1 非持久化交换机和队列

即消息在内存存储,重启消息丢失

  1. 创建非持久化交换机

    image-20241022180407671

  2. 创建非持久化消息队列

    image-20241022180520671

  3. 绑定交换机和消息队列的关系

    image-20241022180609220

测试:发送消息后,队列成功收到消息。

docker restart rabbitmq

重启rabbitmq,内存的消息丢失,内存掉电设备

4.3.2 持久化交换机和消息队列

先来看卡监听消息队列的写法

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dateString,Message message,Channel channel) {log.info(dateString);
}

关注@RabbitListener中,@QueueBinding中的value和exchange两个注解,分别是QueueExchange类型

①@Queue注解分析

@Queue注解抽出关注的部分

@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {/*** Specifies if this queue should be durable.* By default if queue name is provided it is durable.* @return true if the queue is to be declared as durable.* @see org.springframework.amqp.core.Queue#isDurable()*/String durable() default "";/*** Specifies if this queue should be auto deleted when not used.* By default if queue name is provided it is not auto-deleted.* @return true if the queue is to be declared as auto-delete.* @see org.springframework.amqp.core.Queue#isAutoDelete()*/String autoDelete() default "";
}
  • durable属性:By default if queue name is provided it is durable
  • autoDelete属性:By default if queue name is provided it is not auto-deleted

翻译就是:只要消息队列指定,默认持久化且不自动删除

②@Exchange注解分析

@Exchange注解抽出有用的部分

@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {/*** @return false if the exchange is to be declared as non-durable.*/String durable() default TRUE;/*** @return true if the exchange is to be declared as auto-delete.*/String autoDelete() default FALSE;
}
  • durable属性默认true:false if the exchange is to be declared as non-durable
  • autoDelete属性默认false:true if the exchange is to be declared as auto-delete

交换机默认持久化

4.4 消息的可靠消费

消息确认机制

  • 自动确认:消费者接收消息后自动返回ACK确认,RabbitMQ删除消息。自动确认机制,消息处理失败会导致消息丢失(因为消息已删)
  • 手动确认:消费者处理消息成功后,显式发送ACK给消息队列,通知RabbitMQ消息成功消费删除消息,消费者处理消息失败后,显示发送NACK给消息队列,通知RabbitMQ消息消费失败,执行相应的失败策略。手动确认机制保证消息的可靠消费

4.4.1 模块准备

  1. POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  2. YAML:开启手动确认机制

    spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认
    
  3. 主启动

  4. 消息监听:其实durableautoDelete可以不设置,默认值就是这样的

    @Component
    public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME  = "queue.order";// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) {}}
    

4.4.2 手动确认思路

未命名文件

  • 步骤1:YAML配置文件把消息确认模式改为手动确认
  • 步骤2:调用Channel对象的方法返回信息
    • ACK:Acknowledgement,表示消息处理成功
    • NACK:Negative Acknowledgement,表示消息处理失败
    • Reject:拒绝,同样表示消息处理失败
  • 步骤3:拒绝或者消息处理失败的后续操作
    • requeue为true:重新放回队列,重新投递,再次尝试
    • requeue为false:不放回队列,不重新投递
①basicAck()方法
  • 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
②basicNack()方法
  • 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列
③basicReject()方法
  • 方法功能:根据指定的deliveryTag,对该消息表示拒绝
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列

basicNack()和basicReject()有啥区别?

  • basicNack()有批量操作
  • basicReject()没有批量操作

Fanout交换机,同一个消息广播到不同的队列,deliveryTag会重复吗?不会,deliveryTag在Broker范围内唯一

4.4.3 可靠消费代码

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME  = "queue.order";// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) throws IOException {// 1、获取当前消息的 deliveryTag 值备用long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 2、正常业务操作log.info("消费端接收到消息内容:" + dataString);// System.out.println(10 / 0);// 3、给 RabbitMQ 服务器返回 ACK 确认信息channel.basicAck(deliveryTag, false);} catch (Exception e) {// 4、获取信息,看当前消息是否曾经被投递过Boolean redelivered = message.getMessageProperties().getRedelivered();if (!redelivered) {// 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次channel.basicNack(deliveryTag, false, true);} else {// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列channel.basicReject(deliveryTag, false);}}}
}

4.5 消息可靠性投递架构

MQ是系统解耦利器,能很好解除消息发送者和消息接收者之间的耦合。如何保证消息可靠性投递?通过前面我们知道主要分消息可靠发送,消息可靠存储,消息可靠消费。这一小节我们用另一个角度分析

image-20250117153649131

要保证消息可靠性投递,我们分上下两个半场

  • 上半场123分别对应:1发送方调用主动API发送消息,2MQ服务端收到消息并将消息落库(持久化),3发送方收到回调ACK(确认消息成功投递到MQ服务器)

    timer起作用:步骤 3,如果发送方没有收到回调确认(比如服务端由于网络问题或者其他原因未能正确发送 ACK),则发送方会启动一个定时器,尝试重新发送消息。如果多次发送失败(超时),发送方会向业务方回调发送失败,这通常是在重试机制达到最大次数或超时后触发的

  • 下半场456分别对应:4消费端接收消息处理业务逻辑,5接收方(消费者)发送 ACK 回应消息处理成功,6MQ服务端收到ACK并将库中的消息删除

    timer起作用:步骤 5,消费者没有及时发送 ACK(比如消费者处理超时或发生了异常),MQ 服务端会启动定时器等待 ACK

    如果 MQ 服务端在规定时间内没有收到消费者的 ACK,timer 会触发重试机制,可能重新将消息投递到消费者,只到确认消息被处理并收到 ACK 后,消息才会从 MQ 服务端的持久化存储中删除,以确保消息的可靠性

上下半场均有重发,重发策略有定时重发(如每个10s重发直到超出次数)和指数退避(X秒重发,2X秒重发,4X秒重发)

综合来看关键点在于如何保证消息幂等

  • 上半场消息幂等:发送方没有收到回调ACK,会重新发送消息到MQ服务器。上半场的消息幂等性有MQ服务器完成,MQ会为每条消息生成全局唯一的message ID用作去重和幂等依据(上半场消息幂等由MQ服务器完成无需关注)
  • 下半场消息幂等:MQ服务端超时未收到ACK,导致MQ重复投递消息。业务方会收到重复消息,业务方需要保证消息幂等性。比如消息携带全局唯一id用于保证幂等,再处理消息前判断即可

5 消费端限流

利用消息队列的削峰限流,平滑流量避免大量请求涌入,限制请求数量,避免对后端服务造成过大的压力

常见的削峰限流策略有:

image-20241023191236100

通过prefetch来设置消费者**同时接收未确认的消息的数量**,每次预取的消息数量来实现流量削峰

5.1 未设置prefetch

首先向消息队列中发送100个消息

public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}

image-20241024161821436

  • Ready100
  • Unack0
  • Total100

消息消费者监听对应的消息队列

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dataString,Message message,Channel channel) throws InterruptedException, IOException {// 正常业务操作log.info("消费端接收到消息内容:" + dataString);//如果不睡1秒,瞬间为0TimeUnit.SECONDS.sleep(1);//手动确认ACKchannel.basicAck(deliveryTag, false);
}

显示结果:Ready直接为0,Unack和Total逐渐减少直到0

image-20241024161947405

5.2 设置prefetch

修改YAML

spring:rabbitmq:host: 192.168.145.130port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manualprefetch: 1 # 设置每次最多从消息队列服务器取回多少消息(同时接收未确认的消息的数量)

首先发送消息:

public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}

image-20241024161821436

消息消费者监听对应的消息队列

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
public void processMessage(String dataString,Message message,Channel channel) throws InterruptedException, IOException {// 正常业务操作log.info("消费端接收到消息内容:" + dataString);//如果不睡1秒,瞬间为0TimeUnit.SECONDS.sleep(1);//手动确认ACKchannel.basicAck(deliveryTag, false);
}

效果,监听者每次只取一个消息消费,同时未确认消息只有prefetch

image-20241024164743749

6 消息超时

设置过期时间,消息超过过期时间自动删除(更准确的说超时消息会变成死信)

可通过两个层面设置过期时间

  • 队列层面:设置队列的消息过期时间,队列内的消息超出过期时间自动删除
  • 消息层面:设置具体某个消息的过期时间,消息超出过期时间自动删除

如果两个层面都有设置,以过期时间短的为准

6.1 队列层面设置过期时间

创建交换机

image-20241027013536936

创建消息队列,并设置过期时间10000毫秒

image-20241027013449844

绑定交换机

image-20241027013654535

发送消息,不启动消费端,等待消息过期

image-20241027153738779

6.2 消息层面设置过期时间

MessagePostProcessor 是 Spring Framework 的接口,在消息发送前对消息进行处理和修改。通过接口MessagePostProcessor接口在消息层面设置过期时间

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;@Test  
public void testSendMessageTTL() {  // 1、创建消息后置处理器对象  MessagePostProcessor messagePostProcessor = (Message message) -> {  // 设定 TTL 时间,以毫秒为单位message.getMessageProperties().setExpiration("5000");  return message;};// 2、发送消息  rabbitTemplate.convertAndSend(    EXCHANGE_DIRECT,     ROUTING_KEY,     "Hello linzhuowei", messagePostProcessor);    
}

原来消息队列queue.test.timeout过期时间10000毫秒,消息层面设置过期时间5000毫秒,以短的过期时间为标准,发送消息,等待消息过期

image-20241027154917714

7 死信和死信队列

无法正常被消费的消息就称为死信

死信的原因有三种(就是消息没有被正常消费):

  • 拒绝:消费者拒绝消息,basicNack()/basicReject(),并且不把消息重新放回原目标队列(requeue=false
  • 超时:消息达到超时时间未被消费
  • 溢出:队列中消息数量达到最大限制,根据队列先进先出原理,后来再进入一条消息,队列中最早的消息会变成死信

死信的处理方式大致三种:

  • 丢弃:不处理,死信直接丢弃
  • 入库:死信写入数据库,日后处理
  • 监听:死信进入死信队列,消费端监听死信队列,做后序处理(通常采用)

下面分别演示三种死信成因

7.1 准备工作

7.1.1 正常交换机和正常消息队列

  • 正常交换机:exchange.normal.video
  • 正常队列:queue.normal.video
  • 正常路由键:routing.key.normal.video
  1. 创建正常交换机

    image-20241027163756613

  2. 创建正常队列,写好死信队列和死信交换机

    image-20240318165821774

  3. 绑定正常消息队列和正常交换机

    image-20241027165215532

    完成设施后设置如下

    image-20241027165346613

7.1.2 死信交换机和死信队列

  • 死信交换机:exchange.dead.letter.video
  • 死信队列:queue.dead.letter.video
  • 死信路由键:routing.key.dead.letter.video
  1. 创建死信交换机

    image-20241027164612397

  2. 创建死信队列

    image-20241027164929494

  3. 绑定死信队列和死信交换机

    image-20241027172818810

7.1.3 常量声明

public static final String EXCHANGE_NORMAL = "exchange.normal.video";  
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";  public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";  
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";  public static final String QUEUE_NORMAL = "queue.normal.video";  
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";

7.2 死信–拒绝

  1. 发送端发送消息到正常交换机

    @Test  
    public void testSendMessageButReject() {  rabbitTemplate  .convertAndSend(  EXCHANGE_NORMAL,  ROUTING_KEY_NORMAL,  "★[normal]发送消息--正常交换机--正常消息队列...");  
    }
    
  2. 消费端监听正常消息队列和死信队列

    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;import java.io.IOException;@Component
    @Slf4j
    public class DeadLetterListener {public static final String QUEUE_NORMAL = "queue.normal.video";public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";@RabbitListener(queues = {QUEUE_NORMAL})public void processMessageNormal(Message message, Channel channel) throws IOException {// 消费端监听正常消息队列,接收并拒绝消息log.info("★[normal]接收消息,但拒绝消息且不重新放入队列...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = {QUEUE_DEAD_LETTER})public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {// 消费端监听死信队列,接收并成功消费消息log.info("★[dead letter]监听死信队列,接收到死信消息...");log.info("★[dead letter]dataString = " + dataString);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
    

先启动消费端监听死信队列和正常队列,再向正常消息队列发送消息

过程:发送端将消息发送到正常消息队列,监听正常消息队列的消费者接收消息并拒绝,消息通过死信交换机路由到死信队列,监听死信队列的消费者接收并成功消费。

正常消息队列:queue.normal.video,由于消息刚到达就被消费者接收,所以Queued messages没有变化

image-20241027222519208

同一时间,死信队列也是刚接收消息就被消费端消费,所以Queued messages没有变化

image-20241027223034059

消费端控制台打印:

[normal]接收消息,但拒绝消息且不重新放入队列...[dead letter]监听死信队列,接收到死信消息...[dead letter]dataString =[normal]发送消息--正常交换机--正常消息队列...

7.3 死信–超时和溢出

image-20241027232428910

前面创建正常消息队列时就置顶了正常消息队列最大消息数为10(x-max-length=10)且最大生存时间为10s(x-message-ttl=10000

先关闭消费者,向正常消息队列发送20条消息

@Test
public void testSendMessageButReject() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"★[normal]发送消息--正常交换机--正常消息队列...");}
}

image-20241027224927913

  1. 发送者发送20条消息(m1,m2,…,m19,m20
  2. 前十条消息(m1,m2,…,m9,m10)正常进入消息队列,到达最大消息数
  3. 后十条消息(m11,m12,…,m19,m20)进入消息队列,根据队列先进先出,前十条消息(m1,m2,…,m9,m10溢出
  4. 这十条消息(m1,m2,…,m9,m10)通过死信交换机进入死信队列(对应死信队列第一个上坡)
  5. 后十条消息(m11,m12,…,m19,m20)超过10s未被消费,超时,后十条消息(m11,m12,…,m19,m20)也进入死信队列(对应死信队列第二个上坡)

消费者端省略,就还是监听然后消费…

8 延时队列

延时队列有两种实现思路

  • 借助超时时间+死信队列来实现延时队列
  • 通过RabbitMQ插件来完成延时队列

插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

需要将插件放入rabbitmq中容器的?/plugins目录,我们来看看该目录映射到宿主机的哪个目录?

docker inspect rabbitmq

运行结果:

"Mounts": [{"Type": "volume","Name": "rabbitmq-plugin","Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data","Destination": "/plugins","Driver": "local","Mode": "z","RW": true,"Propagation": ""},{"Type": "volume","Name": "0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4","Source": "/var/lib/docker/volumes/0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4/_data","Destination": "/var/lib/rabbitmq","Driver": "local","Mode": "","RW": true,"Propagation": ""}],

和容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data

## 8.1 下载延迟插件

RabbitMQ社区插件:https://www.rabbitmq.com/community-plugins.html

延迟插件:

image-20241028000022313

下载插件安装文件,并移动到对应目录

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data

启用插件

# 登录进入容器内部
docker exec -it rabbitmq /bin/bash# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 退出Docker容器
exit# 重启Docker容器
docker restart rabbitmq

延迟插件启动成功:

image-20241028001239695

8.2 延迟插件的使用

8.2.1 生产者端

通过MessageProcessor来设置延迟时间

@Test
public void testSendDelayMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",messageProcessor -> {// 设置延迟时间:以毫秒为单位messageProcessor.getMessageProperties().setHeader("x-delay", "10000");return messageProcessor;});
}

8.2.2 消费者端

① ui界面创建延迟交换机和队列

使用插件创建交换机exchange.delay.happy

使用rabbitmq_delayed_message_exchange插件要求交换机type=x-delayed-message,并通过x-delayed-type设置交换机的类型(direct、fanout、topic),创建方式如下:

image-20240319163915574

创建消息队列queue.delay.video并绑定exchange.delay.happy交换机

@Component  
@Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(queues = {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException {  log.info("[生产者]" + dataString);log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
②代码创建延迟交换机和队列
@Component  
@Slf4j
public class MyDelayMessageListener {  public static final String EXCHANGE_DELAY = "exchange.delay.video";public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(bindings = @QueueBinding(  value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),  exchange = @Exchange(  value = EXCHANGE_DELAY,   durable = "true",   autoDelete = "false",   type = "x-delayed-message",   arguments = @Argument(name = "x-delayed-type", value = "direct")),  key = {ROUTING_KEY_DELAY}  ))  public void process(String dataString, Message message, Channel channel) throws IOException {  log.info("[生产者]" + dataString);  log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  }  }

8.2.3 效果展示

前面消息可靠投递中说过,消息发送后回调confirm(),而returnMessage()只有在消息发送失败才会回调,但是使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行(问题不大嘛)

消费端效果:

[生产者]测试基于插件的延迟消息 [12:41:29]
[消费者]12:41:39

9 事务消息

9.1 什么是事务消息?

生产者发送消息的操作打包成一个原子操作,要么全部成功要么全部失败,通过事务消息保证消息发送的原子性

RabbitMQ 的事务消息有点类似 Spring 的事务,分为开始事务、提交事务、回滚事务。

  • txSelect():开始事务,使用 txSelect() 开启事务。
  • txCommit():提交事务,如果 txCommit() 提交事务成功了,则消息一定会发送到 RabbitMQ
  • txRollback():回滚事务,如果在执行 txCommit() 之前 RabbitMQ 发生了异常,txRollback() 会捕获异常进行回滚。

RabbitMQ 发送事务消息流程:txSelect开启事务,消息发送到 RabbitMQ 缓存,接着 txCommit 提交事务,txCommit成功后则消息一定发送到了 RabbitMQ。

如果在 txCommit 完成前出现任何异常,我们就捕获这个异常然后执行 txRollback 进行回滚操作,整个过程跟 Spring 的事务机制没太大的区别。因此,我们可以通过 RabbitMQ 事务机制保证消息一定可以发送成功

了解了 RabbitMQ 的事务消息机制,接下来我们就分享两种方式来实现 RabbitMQ 事务消息

9.2 Springboot发送事务消息

9.2.1 准备工作

  1. 改pom.xml

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  2. 写yaml

    spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
    
  3. 主启动

  4. 事务配置

    @Configuration
    @Data
    public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
    }
    

9.2.2 没有事务消息

没有事务消息,无法保证消息发送原子性

@SpringBootTest
@Slf4j
public class RabbitMQTest {public static final String EXCHANGE_NAME = "exchange.tx.dragon";public static final String ROUTING_KEY = "routing.key.tx.dragon";@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");// 2、抛出异常log.info("do bad:" + 10 / 0);// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");}}

抛出异常前的消息发送了,抛异常后的消息没有发送:

image-20241028232807095

9.2.3 使用事务消息

因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控

@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");log.info("do bad:" + 10 / 0);// 2、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}

image-20241028232918897

9.3 Channel发送事务消息

工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RabbitMqUtil {public static Channel getChannel() {// 创建一个连接工厂,并设置MQ的相关信息ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxxxx");factory.setUsername("xxx");factory.setPassword("xxx");factory.setVirtualHost("/xxx");Channel channel = null;try {// 创建连接Connection connection = factory.newConnection();// 获取信道channel = connection.createChannel();} catch (Exception e) {log.error("创建 RabbitMQ Channel 失败", e);e.printStackTrace();}return channel;}
}

Channel发送事务消息

import com.rabbitmq.client.Channel;
import com.user.service.util.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Slf4j
@Component
public class RabbitTransactionChannelProducer {@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")public void sendTransactionaChannelMessage(String message) {//获取 ChannelChannel channel = RabbitMqUtil.getChannel();try {//开启事务channel.txSelect();//发送消息channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "1").getBytes(StandardCharsets.UTF_8));channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "2").getBytes(StandardCharsets.UTF_8));//发送第三条消息之前模拟一个错误 我们看下前两条消息否回滚了//int a = 1 / 0;channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "3").getBytes(StandardCharsets.UTF_8));//提交事务channel.txCommit();} catch (Exception e) {//回滚事务try {channel.txRollback();} catch (IOException ex) {log.error("txRollback error", e);ex.printStackTrace();}e.printStackTrace();} finally {try {channel.close();} catch (Exception e) {log.error("channel close error", e);e.printStackTrace();}}}
}

10 惰性队列

创建队列分两种:

  • default默认消息队列:消息存储在内存,当队列内存限制触发才会将部分消息移到磁盘

  • lazy惰性消息队列:消息尽可能地保存在磁盘,内存中只保持必要的元数据

    惰性队列,将消息尽可能地保存在磁盘,减少内存的使用。有效防止由于队列消息过多导致的内存溢出,是处理需要处理大量消息但内存有限的场景。但是由于消息存于磁盘,生产者发送消息和消费者消费比普通队列慢,尤其在高吞吐场景

队列创建置顶模式方式有:使用队列策略(建议)和设置queue.declare参数

如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。

10.1 队列策略设定

# 登录Docker容器
docker exec -it rabbitmq /bin/bash# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量

  • set_policy是子命令,表示设置策略

  • Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的

  • "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置

  • '{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"

  • –-apply-to参数指定该策略将应用于队列(queues)级别

  • 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列

如果需要修改队列模式可以执行如下命令(不必删除队列再重建):

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

10.3 queue.declare参数设定

参数x-queue-mode设定队列创建模式,lazydefault(默认)

Java代码原生API设置方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

Java代码注解设置方式:

@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy")
})

11 优先级队列

优先级队列允许你根据消息的优先级来处理消息。消息默认先进先出,通过设置不同的优先级值,消费者可以优先处理重要或紧急的消息,而延迟处理优先级较低的消息

11.1 准备工作

  1. 创建交换机:exchange.test.priority

  2. 创建消息队列:queue.test.priority

    image-20241029005751489

    RabbitMQ消息优先级范围 1到255 ,建议使用 1到5(数字越大优先级越高)

    通过设置 x-max-priority来指定消息队列的最大优先级,默认为0。而消息的优先级不能大于x-max-priority,所以使用优先级队列一定要指定x-max-priority,这里指定为x-max-priority=10

  3. 改POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
    </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
    </dependencies>
    
  4. YAML

    spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
    
  5. 主启动

11.2 使用优先级队列

不要启动消费者程序,让多条不同优先级的消息滞留在队列中

  • 第一次发送优先级为1的消息
  • 第二次发送优先级为2的消息
  • 第三次发送优先级为3的消息

先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息

消息生产者:

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_PRIORITY = "exchange.test.priority";public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {//第一次发送优先级为1的消息rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{message.getMessageProperties().setPriority(1);return message;});//第二次发送优先级为2的消息//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{//    message.getMessageProperties().setPriority(2);//    return message;//});//第三次发送优先级为3的消息//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{//    message.getMessageProperties().setPriority(3);//    return message;//});}}

image-20241029010516996

消费端:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyMessageProcessor {public static final String QUEUE_PRIORITY = "queue.test.priority";@RabbitListener(queues = {QUEUE_PRIORITY})public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {log.info(data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

效果:

I am a message with priority 3.
I am a message with priority 2.
I am a message with priority 1.

相关文章:

RabbitMQ基础篇

文章目录 1 RabbitMQ概述1.1 消息队列1.2 RabbitMQ体系结构 2 RabbitMQ工作模式2.1 简单模式&#xff08;Simple Queue&#xff09;2.2 工作队列模式&#xff08;Work Queues&#xff09;2.3 发布/订阅模式&#xff08;Publish/Subscribe&#xff09;2.4 路由模式&#xff08;R…...

【springboot】Spring 官方抛弃了 Java 8!新idea如何创建java8项目

解决idea至少创建jdk17项目 问题 idea现在只能创建最少jdk17&#xff0c;不能创建java8了吗?解决 问题 idea现在只能创建最少jdk17&#xff0c;不能创建java8了吗 我本来以为是 IDEA 版本更新导致的 Bug&#xff0c;开始还没在意。 直到我今天自己初始化项目时才发现&am…...

[BrainShadow-V1] VR头戴设备统计报告

Brain-Shadow-V1 EventVR headsetsReported byXiao enDate2025/01/15Version1.0 HTC Vive Pro 2 Pro HTC Vive Pro 2 是一款高端虚拟现实头显&#xff0c;配备双 2.5K 显示屏&#xff0c;组合分辨率达到 48962448&#xff0c;提供 120 的视场角和 120Hz 的刷新率。该设备支持…...

RK3568 Android11 锁屏界面屏蔽下拉状态栏

参考文章&#xff1a; Android R锁屏界面屏蔽下拉状态栏_pulseexpansionhandler-CSDN博客 前提增加状态栏控制显隐属性&#xff0c;以下面文章为前提补充功能 RK3568 Android11 状态栏和导航栏增加显示控制功能-CSDN博客 修改文件位置&#xff1a; frameworks/base/package…...

53,【3】BUUCTF WEB october 2019 Twice SQLinjection

题目得到信息&#xff0c;2次注入&#xff0c;进入靶场 登录页面&#xff0c;很自然想到SQL 第一次注入应该是这个可以登录&#xff0c;注册&#xff0c;提交简介的页面 第二次注入应该是在info处注入&#xff0c;信息显示在简介处 我真的纯脑子有病&#xff0c;人家二次注入不…...

利用硬盘虚拟内存解决华为手机模拟器运行内存不足问题

在进行鸿蒙开发时&#xff0c;华为手机模拟器是必不可少的工具。然而&#xff0c;对于只有 8GB 物理内存的电脑来说&#xff0c;运行模拟器可能会遇到 "系统内存不足" 的提示&#xff0c;导致模拟器无法正常启动。这时&#xff0c;我们可以通过硬盘虚拟出额外的内存来…...

探秘Shortest与Stagehand:开启高效测试与自动化新篇

探秘Shortest与Stagehand&#xff1a;开启高效测试与自动化新篇 在数字化浪潮的推动下&#xff0c;网页自动化工具如同繁星般涌现&#xff0c;为众多行业带来了效率的变革。在这些工具中&#xff0c;Shortest和Stagehand凭借其出色的表现&#xff0c;成为了众多开发者、测试人…...

网络安全构成要素

一、防火墙 组织机构内部的网络与互联网相连时&#xff0c;为了避免域内受到非法访问的威胁&#xff0c;往往会设置防火墙。 使用NAT&#xff08;NAPT&#xff09;的情况下&#xff0c;由于限定了可以从外部访问的地址&#xff0c;因此也能起到防火墙的作用。 二、IDS入侵检…...

家政服务小程序,打造智慧家政新体验

春节即将来临&#xff0c;家政市场呈现出了火热的场景&#xff0c;大众对家政服务的需求持续增加。 近年来&#xff0c;家政市场开始倾向数字化、智能化&#xff0c;借助科学技术打造家政数字化平台&#xff0c;让大众在手机上就可以预约家政服务&#xff0c;减少传统家政市场…...

2.使用Spring BootSpring AI快速构建AI应用程序

Spring AI 是基于 Spring Boot3.x 框架构建&#xff0c;Spring Boot官方提供了非常便捷的工具Spring Initializr帮助开发者快速的搭建Spring Boot应用程序,IDEA也集成了此工具。本文使用的开发工具IDEASpring Boot 3.4Spring AI 1.0.0-SNAPSHOTMaven。 1.创建Spring Boot项目 …...

OpenCV实战-全景图像拼接

代码地址见文末 实现效果 1. 项目背景 随着计算机视觉技术的不断发展&#xff0c;图像拼接技术已被广泛应用于虚拟现实、地图生成、全景摄影等领域。图像拼接&#xff08;Image Stitching&#xff09;旨在将多张部分重叠的图像无缝拼接成一幅完整的全景图像。此任务要求图像处…...

h5使用video播放时关掉vant弹窗视频声音还在后台播放

现象&#xff1a; 1、点击遮罩弹窗关闭&#xff0c;弹窗的视频已经用v-if销毁&#xff0c;但是后台会自己从头开始播放视频声音。但是此时已经没有视频dom 2、定时器在打开弹窗后3秒自动关闭弹窗&#xff0c;则正常没有问题。 原来的代码&#xff1a; //页面 <a click&quo…...

解决leetcode第3418题机器人可以获得的最大金币数

3418.机器人可以获得的最大金币数 难度&#xff1a;中等 问题描述&#xff1a; 给你一个mxn的网格。一个机器人从网格的左上角(0,0)出发&#xff0c;目标是到达网格的右下角(m-1,n-1)。在任意时刻&#xff0c;机器人只能向右或向下移动。 网格中的每个单元格包含一个值coin…...

anaconda安装和环境配置

文章目录 一、Anaconda下载1.从官网直接下载&#xff1a;2.从镜像站中下载&#xff1a; 二、Anaconda安装三、检测是否有Anaconda配置anaconda环境 四、 Anaconda创建多个python环境&#xff08;方便管理项目环境&#xff09;1.查看conda有哪些环境2.创建python3.6的环境3.激活…...

Lora理解QLoRA

Parameter-Efficient Fine-Tuning (PEFT) &#xff1a;节约开销的做法&#xff0c;fine-tune少量参数&#xff0c;而不是整个模型&#xff1b; Low-Rank Adaptation (LoRA) &#xff1a;是PEFT的一种&#xff1b;冻结原参数矩阵&#xff0c;只更新2个小参数矩阵。 原文经过对比…...

嵌入式杂谈——什么是DMA?有什么用?

什么是DMA&#xff1f;——直接内存访问技术详解 在嵌入式系统和计算机体系结构中&#xff0c;DMA&#xff08;Direct Memory Access&#xff0c;直接内存访问&#xff09; 是一种重要的数据传输技术。它允许外设&#xff08;如UART、SPI、ADC等&#xff09;直接与内存进行数据…...

超标量处理器设计2-cache

1. cache 介绍 影响Cache缺失的情况有3种&#xff1a; Compulsory: 第一次被访问的指令或者数据肯定不会在cache中&#xff0c;需要通过预取来减少这种缺失Capcity: Cache容量越大&#xff0c;缺失就可以更少, 程序频繁使用的三个数据来源于3个set&#xff0c; 但是&#xff…...

使用Nginx正向代理让内网主机通过外网主机访问互联网

目录 环境概述 流程说明 在外网服务器上安装部署nginx? 安装前准备 下载nginx ?编译安装nginx 开始配置正向代理 创建systemd服务单元文件&#xff0c;用于管理Nginx服务的启动、停止和重新加载 启动nginx ?代理服务器本地验证 ?内网服务器验证 ?将代理地址添…...

蓝桥杯刷题第二天——背包问题

题目描述 有N件物品和一个容量是V的背包。每件物品只能使用一次。第i件物品的体积是Vi价值是Wi。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输出最大价值。 输入格式 第一行两个整数&#xff0c;N&#xff0c;V&am…...

DM达梦启用及收集AWR报告

1.创建DBMS_WORKLOAD_REPOSITORY系统包 查看DBMS_WORKLOAD_REPOSITORY系统包启用状态 SQL> SELECT SF_CHECK_AWR_SYS;LINEID SF_CHECK_AWR_SYS ---------- ---------------- 1 0SF_CHECK_AWR_SYS 返回值 0&#xff1a;未启用&#xff1b;1&#xff1a;已启…...

【git】如何删除本地分支和远程分支?

1.如何在 Git 中删除本地分支 本地分支是您本地机器上的分支&#xff0c;不会影响任何远程分支。 &#xff08;1&#xff09;在 Git 中删除本地分支 git branch -d local_branch_name git branch 是在本地删除分支的命令。-d是一个标志&#xff0c;是命令的一个选项&#x…...

pix2pix mmgeneration通用场景黑白图片上色模型训练,Docker

https://www.dong-blog.fun/post/1924 对于机器学习和深度学习感兴趣的读者来说,OpenMMLab 提供的 MMGeneration 库是一个绝佳的选择。最近我在阅读一篇关于 MMGeneration 的博客文章,尤其是在使用 Docker 环境进行模型和算法测试方面,受益匪浅。以下是我对目标博客内容的概…...

【Redis入门到精通六】在Spring Boot中集成Redis(含配置和操作演示)

目录 Spring Boot中集成Redis 1.项目创建和环境配置 2.基本操作演示 Spring Boot中集成Redis Spring社区也自定义了一套Redis的客户端&#xff0c;与jedis的操作方式有所差异&#xff0c;Spring中把每个类型的操作都单独封装了起来。下面就让我来带大家了解如何在Spring Bo…...

js使用qrcode与canvas生成带logo的二维码

qrcode库 文档 https://www.npmjs.com/package/qrcode 安装 npm i qrcode 使用 errorCorrectionLevel: H // 容错率&#xff08;H是最高&#xff0c;其它看文档&#xff09; width: 200 // 大小 margin: 2 // 边距 import QRCode from qrcodeconst testFn async () > {c…...

【STM32】LED状态翻转函数

1.利用状态标志位控制LED状态翻转 在平常编写LED状态翻转函数时&#xff0c;通常利用状态标志位实现LED状态的翻转。如下所示&#xff1a; unsigned char led_turn_flag; //LED状态标志位&#xff0c;1-点亮&#xff0c;0-熄灭/***************************************函…...

FreeRTOS 简介

FreeRTOS 是一个小型、实时操作系统内核&#xff0c;专为嵌入式设备设计。它支持多任务操作、任务优先级、互斥机制和队列管理&#xff0c;是轻量级嵌入式开发中的热门选择。以下是其主要特点&#xff1a; 特点 实时性能&#xff1a;提供确定性的任务调度&#xff0c;适用于对…...

Java并发编程中的synchronized和volatile:用途解析与使用场景

目录 一、synchronized关键字&#xff1a;互斥与同步的保障 二、volatile关键字&#xff1a;轻量级的变量可见性保证 三、synchronized与volatile的区别与选择 四、总结 在Java并发编程中&#xff0c;synchronized和volatile是两个非常重要的关键字&#xff0c;它们在多线程…...

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)

问题 项目里使用了 AzureBlob 存储了用户上传的各种资源文件&#xff0c;近期 AzureBlob 的流量费用增长很快&#xff0c;想通过分析Blob的日志&#xff0c;获取一些可用的信息&#xff0c;所以有了这个需求&#xff1a;将存储账户的日志&#xff08;读写&#xff0c;审计&…...

程序设计:排版、检验报告的上下标解决几种办法

【啰嗦两句】 本文重点在于提供几个针对排版文档、各种检验报告系统等程序设计时&#xff0c;遇到的上下标录入、绘制展示等问题的应对办法&#xff0c;但是准确地说&#xff0c;并没有非常优秀的方案。 【上下标难题】 一般的行业或许对上下标并没有严格要求&#xff0c;多数…...

【2024年华为OD机试】 (C卷,100分)- 求字符串中所有整数的最小和(Java JS PythonC/C++)

一、问题描述 题目解析 题目描述 输入字符串 s&#xff0c;输出 s 中包含所有整数的最小和。 说明 字符串 s 只包含 a-z、A-Z、、-。合法的整数包括&#xff1a; 正整数&#xff1a;一个或多个 0-9 组成&#xff0c;如 0、2、3、002、102。负整数&#xff1a;负号 - 开头&…...

MBox20网关:数字化工厂的智能加速器

在当今这个日新月异的数字化时代&#xff0c;企业对于生产效率、数据管理和网络安全的追求已经达到了前所未有的高度。特别是在制造业领域&#xff0c;随着“工业4.0”和“智能制造”概念的深入实践&#xff0c;数字化工厂已成为产业升级的必然趋势。在这场深刻的变革中&#x…...

NodeJS | 搭建本地/公网服务器 live-server 的使用与安装

目录 介绍 安装 live-server 安装方法 安装后的验证 环境变量问题 Node.js 环境变量未配置正确 全局安装的 live-server 路径未添加到环境变量 运行测试 默认访问主界面 访问文件 报错信息与解决 问题一&#xff1a;未知命令 问题二&#xff1a;拒绝脚本 公网配置…...

用C++实现一个基于模板的观察者设计模式

观察者模式 定义 观察者模式(Observer Pattern)是一种行为型设计模式,用于定义对象间的一对多依赖关系,使得当一个对象状态发生变化时,其所有依赖它的对象都会收到通知并自动更新。 核心概念 角色定义 Subject(被观察者): 持有观察者列表,维护观察者的注册和移除。 …...

LabVIEW开发X光图像的边缘检测

在医疗影像处理中&#xff0c;X光图像的分析对于骨折、肿瘤等病变的检测非常重要。X光图像中包含许多关键信息&#xff0c;然而&#xff0c;由于图像噪声的干扰&#xff0c;直接从图像中提取有用的特征&#xff08;如骨折的边缘&#xff09;变得非常困难。边缘检测作为图像处理…...

GitEE

版本控制 cvs svn git 等等 一、团队开发过程中的问题 1、备份【Release】 2、代码还原 3、协同修改 4、多版本文件管理 5、追溯问题代码的编写人和编写时间 6、权限控制 二、版本控制 版本控制就是维护工程蓝图标准做法&#xff0c;能追踪工程蓝图从诞生一直到定案的过程…...

Ubuntu配置python环境

前言 Ubuntu22.04自带python3&#xff0c;仅需要安装pip3即可。 也可以安装Anaconda使用虚拟环境。 本地Python环境 查看python3是否已安装&#xff1a; python3 -V若已安装python3&#xff0c;继续安装pip3&#xff1a; sudo apt install python3-pip查看pip版本&#xf…...

数据库的DML

1.insert 数据库于表创建成功后&#xff0c;需要向数据库的表中插入数据。在MySQL中可以使用insert语句向数据库已有的表中插入一行或者多行元组数据 基本语法&#xff1a; insert 语句有两种语法形式&#xff0c;分别是insert…values语句和insert…set语句 insert into&l…...

什么是SSL及SSL的工作流程

什么是 SSL SSL(Secure Sockets Layer,安全套接层)是一种保护互联网通信安全的加密协议,用于确保数据在客户端和服务器之间传输时的保密性、完整性和身份验证。它已被TLS(Transport Layer Security,传输层安全协议)取代,但很多场景仍习惯称其为SSL。 SSL/TLS 的主要目…...

RabbitMQ---消息确认和持久化

&#xff08;一&#xff09;消息确认 1.概念 生产者发送消息后&#xff0c;到达消费端会有以下情况&#xff1a; 1.消息处理成功 2.消息处理异常 如果RabbitMQ把消息发送给消费者后就把消息删除&#xff0c;那么就可能会导致&#xff0c;消息处理异常想要再获取这条消息的时…...

4 AXI USER IP

前言 使用AXI Interface封装IP&#xff0c;并使用AXI Interface实现对IP内部寄存器进行读写实现控制LED的demo&#xff0c;这个demo是非常必要的&#xff0c;因为在前面的笔记中基本都需哟PS端与PL端就行通信互相交互&#xff0c;在PL端可以通过中断的形式来告知PS端一些事情&…...

windows下安装并使用node.js

一、下载Node.js 选择对应你系统的Node.js版本下载 Node.js官网下载地址 Node.js中文网下载地址??? 这里我选择的是Windows64位系统的Node.js20.18.0&#xff08;LTS长期支持版本&#xff09;版本的.msi安装包程序 官网下载&#xff1a; 中文网下载&#xff1a; 二、安…...

【报错解决】Sql server 2022连接数据库时显示证书链是由不受信任的颁发机构颁发的

SSMS 20在连接Sql server 2022数据库时有如下报错&#xff1a; A connection was successfully established with the server, but then an error occurred during the login process. (provider: SSL Provider, error: 0 - 证书链是由不受信任的颁发机构颁发的。 原因是尝试使…...

VSCode 的部署

一、VSCode部署 (1)、简介 vsCode 全称 Visual Studio Code&#xff0c;是微软出的一款轻量级代码编辑器&#xff0c;免费、开源而且功能强大。它支持几乎所有主流的程序语言的语法高亮、智能代码补全、自定义热键、括号匹配、代码片段、代码对比Diff、版本管理GIT等特性&…...

淘宝、京东联盟数字ID转加密ID接口

该接口可以将主站的数字ID转换为加密ID 例如&#xff1a;123456789 转换为 xxxxxxxxxx-xxxxxxxxx PHP示例 // 接口地址&#xff1a;https://www.haodanku.com/openapi/api_detail?id103 $app_secret 你的appSecret, //替换成自己的 $x [app_id > 你的appid, //替换成…...

【物联网】keil仿真环境设置 keilV5可以适用ARM7

文章目录 一、ARM指令模拟器环境搭建1. keil软件2. Legacy Support 二、Keil仿真环境设置1. 创建一个项目2. 编译器介绍(1)arm-none-eabi-gcc(2)arm-none-linux-gnueabi-gcc(3)arm-eabi-gcc(4)grmcc(5)aarch64-linux-gnu-gcc 3. 安装编译器(1)设置调试 一、ARM指令模拟器环境搭…...

Oracle 可观测最佳实践

简介 Oracle 数据库是一种广泛使用的商业关系数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;由甲骨文公司&#xff08;Oracle Corporation&#xff09;开发。它支持 SQL 语言&#xff0c;能够存储和管理大量数据&#xff0c;并提供高级数据管理功能&#xff0c;如数…...

上传自己的镜像到docker hub详细教程

上传自己的镜像到docker hub详细教程 本博客通B站视频一致&#xff1a; 上传自己的镜像到docker hub详细教程 1. 登录自己的hub.docker.com的账号 docker hub仓库 2. 点击Repositories&#xff0c;跳转到创建仓库页面 3. 点击Create a repository 创建repository&#xff0c…...

Python猜数小游戏

Python 实现的《猜数游戏》 介绍 本文将展示如何使用 Python 编写一个简单的《猜数游戏》。这个游戏将会生成一个1到10之间的随机数&#xff0c;用户有最多三次机会来猜测正确的数字。如果用户猜对了&#xff0c;游戏将结束并显示恭喜信息&#xff1b;如果没有猜对&#xff0…...

HackMyVM-Klim靶机的测试报告

目录 一、测试环境 1、系统环境 2、使用工具/软件 二、测试目的 三、操作过程 1、信息搜集 2、Getshell 3、提权 CVE-2008-0166 四、结论 一、测试环境 1、系统环境 渗透机&#xff1a;kali2021.1(192.168.159.127) 靶 机&#xff1a;debian(192.168.159.27) 注意事…...

MySQL中大量数据优化方案

文章目录 1 大量数据优化1.1 引言1.2 评估表数据体量1.2.1 表容量1.2.2 磁盘空间1.2.3 实例容量 1.3 出现问题的原因1.4 解决问题1.4.1 数据表分区1.4.1.1 简介1.4.1.2 分区限制和执行计划1.4.1.3 分区表的索引1.4.1.4 为什么分区键必须是主键的一部分1.4.1.5 操作分区1.4.1.5.…...