当前位置: 首页 > news >正文

RabbitMQ知识点

1.为什么需要消息队列?

 

 

 

 

 

 

 

 RabbitMQ体系结构

操作001:RabbitMQ安装

二、安装

# 拉取镜像
docker pull rabbitmq:3.13-management
​
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management

二、验证

访问后台管理界面:http://192.168.200.100:15672

使用上面创建Docker容器时指定的默认用户名、密码登录:

三、可能的问题

1、问题现象

在使用Docker拉取RabbitMQ镜像的时候,如果遇到提示:missing signature key,那就说明Docker版本太低了,需要升级

比如我目前的Docker版本如下图所示:

2、解决办法

基于CentOS7

①卸载当前Docker

更好的办法是安装Docker前曾经给服务器拍摄了快照,此时恢复快照;

如果不曾拍摄快照,那只能执行卸载操作了

yum erase -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-selinux \docker-engine-selinux \docker-engine \docker-ce

②升级yum库

yum update -y

③安装Docker最新版

yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

如果这一步看到提示:没有可用软件包 docker-ce,那就添加Docker的yum源:

yum install -y yum-utils
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

④设置Docker服务

systemctl start docker
systemctl enable docker

3、验证

上述操作执行完成后,再次查看Docker版本:

操作002:HelloWorld

一、目标

生产者发送消息,消费者接收消息,用最简单的方式实现

官网说明参见下面超链接:

RabbitMQ tutorial - "Hello World!" — RabbitMQ

二、具体操作

1、创建Java工程

①消息发送端(生产者)

②消息接收端(消费者)

③添加依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
</dependencies>

2、发送消息

①Java代码

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

package com.atguigu.rabbitmq.simple;  import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class Producer {  public static void main(String[] args) throws Exception {  // 创建连接工厂  ConnectionFactory connectionFactory = new ConnectionFactory();  // 设置主机地址  connectionFactory.setHost("192.168.200.100");  // 设置连接端口号:默认为 5672connectionFactory.setPort(5672);// 虚拟主机名称:默认为 /connectionFactory.setVirtualHost("/");// 设置连接用户名;默认为guest  connectionFactory.setUsername("guest");// 设置连接密码;默认为guest  connectionFactory.setPassword("123456");// 创建连接  Connection connection = connectionFactory.newConnection();  // 创建频道  Channel channel = connection.createChannel();  // 声明(创建)队列  // queue      参数1:队列名称  // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在  // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列  // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除  // arguments  参数5:队列其它参数  channel.queueDeclare("simple_queue", true, false, false, null);  // 要发送的信息  String message = "你好;小兔子!";  // 参数1:交换机名称,如果没有指定则使用默认Default Exchange  // 参数2:路由key,简单模式可以传递队列名称  // 参数3:配置信息  // 参数4:消息内容  channel.basicPublish("", "simple_queue", null, message.getBytes());  System.out.println("已发送消息:" + message);  // 关闭资源  channel.close();  connection.close();  }  }

②查看效果

3、接收消息

①Java代码

不用客气,整个代码全部复制——当然,连接信息改成你自己的:

package com.atguigu.rabbitmq.simple;  import com.rabbitmq.client.*;  import java.io.IOException;  public class Consumer {  public static void main(String[] args) throws Exception {  // 1.创建连接工厂  ConnectionFactory factory = new ConnectionFactory();  // 2. 设置参数  factory.setHost("192.168.200.100");  factory.setPort(5672);  factory.setVirtualHost("/");  factory.setUsername("guest");factory.setPassword("123456");  // 3. 创建连接 Connection        Connection connection = factory.newConnection();  // 4. 创建Channel  Channel channel = connection.createChannel();  // 5. 创建队列  // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建  // 参数1. queue:队列名称  // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在  // 参数3. exclusive:是否独占。  // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉  // 参数5. arguments:其它参数。  channel.queueDeclare("simple_queue",true,false,false,null);  // 接收消息  DefaultConsumer consumer = new DefaultConsumer(channel){  // 回调方法,当收到消息后,会自动执行该方法  // 参数1. consumerTag:标识  // 参数2. envelope:获取一些信息,交换机,路由key...  // 参数3. properties:配置信息  // 参数4. body:数据  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("consumerTag:"+consumerTag);  System.out.println("Exchange:"+envelope.getExchange());  System.out.println("RoutingKey:"+envelope.getRoutingKey());  System.out.println("properties:"+properties);  System.out.println("body:"+new String(body));  }  };  // 参数1. queue:队列名称  // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息  // 参数3. callback:回调对象  // 消费者类似一个监听程序,主要是用来监听消息  channel.basicConsume("simple_queue",true,consumer);  }  }

②控制台打印

consumerTag:amq.ctag-8EB87GaZFP52LKSMcj98UA Exchange: RoutingKey:simple_queue properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) body:你好;小兔子!

③查看后台管理界面

因为消息被消费掉了,所以RabbitMQ服务器上没有了:

操作003:工作队列模式

Work Queues 本质上我们刚刚写的HelloWorld程序就是这种模式,只是简化到了最简单的情况:• 生产者只有一个 • 发送一个消息 • 消费者也只有一个,消息也只能被这个消费者消费所以HelloWorld也称为简单模式。 现在我们还原一下常规情况: • 生产者发送多个消息 • 由多个消费者来竞争 • 谁抢到算谁的

结论: • 多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。• Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。

一、生产者代码

1、封装工具类

package com.atguigu.rabbitmq.util;  import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class ConnectionUtil {  public static final String HOST_ADDRESS = "192.168.200.100";  public static Connection getConnection() throws Exception {  // 定义连接工厂  ConnectionFactory factory = new ConnectionFactory();  // 设置服务地址  factory.setHost(HOST_ADDRESS);  // 端口  factory.setPort(5672);  //设置账号信息,用户名、密码、vhost  factory.setVirtualHost("/");  factory.setUsername("guest");  factory.setPassword("123456");  // 通过工程获取连接  Connection connection = factory.newConnection();  return connection;  }  public static void main(String[] args) throws Exception {  Connection con = ConnectionUtil.getConnection();  // amqp://guest@192.168.200.100:5672/  System.out.println(con);  con.close();  }  }

2、编写代码

package com.atguigu.rabbitmq.work;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  public class Producer {  public static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  for (int i = 1; i <= 10; i++) {  String body = i+"hello rabbitmq~~~";  channel.basicPublish("",QUEUE_NAME,null,body.getBytes());  }  channel.close();  connection.close();  }  }

3、发送消息效果

img

二、消费者代码

1、编写代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。

package com.atguigu.rabbitmq.work;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  import java.io.IOException;  public class Consumer1 {  static final String QUEUE_NAME = "work_queue";  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("Consumer1 body:"+new String(body));  }  };  channel.basicConsume(QUEUE_NAME,true,consumer);  }  }

注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序。 如果已经运行过生产者程序,则手动把work_queue队列删掉。

2、运行效果

最终两个消费端程序竞争结果如下:

image-20231103103841644

操作004:发布订阅模式

生产者不是把消息直接发送到队列,而是发送到交换机

• 交换机接收消息,而如何处理消息取决于交换机的类型• 交换机有如下3种常见类型

• Fanout:广播,将消息发送给所有绑定到交换机的队列

• Direct:定向,把消息交给符合指定routing key的队列

• Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

• 注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失! 

组件之间关系:

        • 生产者把消息发送到交换机

        • 队列直接和交换机绑定

• 工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列

• 理解概念:

        • Publish:发布,这里就是把消息发送到交换机上

        • Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系

一、生产者代码

package com.atguigu.rabbitmq.fanout;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class Producer {  public static void main(String[] args) throws Exception {  // 1、获取连接  Connection connection = ConnectionUtil.getConnection();  // 2、创建频道  Channel channel = connection.createChannel();  // 参数1. exchange:交换机名称  // 参数2. type:交换机类型  //     DIRECT("direct"):定向  //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。  //     TOPIC("topic"):通配符的方式  //     HEADERS("headers"):参数匹配  // 参数3. durable:是否持久化  // 参数4. autoDelete:自动删除  // 参数5. internal:内部使用。一般false  // 参数6. arguments:其它参数  String exchangeName = "test_fanout";  // 3、创建交换机  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);  // 4、创建队列  String queue1Name = "test_fanout_queue1";  String queue2Name = "test_fanout_queue2";  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 5、绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //     如果交换机的类型为fanout,routingKey设置为""  channel.queueBind(queue1Name,exchangeName,"");  channel.queueBind(queue2Name,exchangeName,"");  String body = "日志信息:张三调用了findAll方法...日志级别:info...";  // 6、发送消息  channel.basicPublish(exchangeName,"",null,body.getBytes());  // 7、释放资源  channel.close();  connection.close();  }  }

二、消费者代码

1、消费者1号

package com.atguigu.rabbitmq.fanout;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue1Name = "test_fanout_queue1";  channel.queueDeclare(queue1Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue1Name,true,consumer);  }  }

2、消费者2号

package com.atguigu.rabbitmq.fanout;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue2Name = "test_fanout_queue2";  channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue2Name,true,consumer);  }  }

三、运行效果

还是先启动消费者,然后再运行生产者程序发送消息:

img

img

四、小结

交换机和队列的绑定关系如下图所示:

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机

  • 发布订阅模式绑定指定交换机

  • 监听同一个队列的消费端程序彼此之间是竞争关系

  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

操作006-路由模式

 通过『路由绑定』的方式,把交换机和队列关联起来

• 交换机和队列通过路由键进行绑定

• 生产者发送消息时不仅要指定交换机,还要指定路由键

• 交换机接收到消息会发送到路由键绑定的队列

• 在编码上与 Publish/Subscribe发布与订阅模式的区别:

        • 交换机的类型为:Direct

        • 队列绑定交换机的时候需要指定routing key。

一、生产者代码

package com.atguigu.rabbitmq.routing;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class Producer {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String exchangeName = "test_direct";  // 创建交换机  channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);  // 创建队列  String queue1Name = "test_direct_queue1";  String queue2Name = "test_direct_queue2";  // 声明(创建)队列  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 队列绑定交换机  // 队列1绑定error  channel.queueBind(queue1Name,exchangeName,"error");  // 队列2绑定info error warning  channel.queueBind(queue2Name,exchangeName,"info");  channel.queueBind(queue2Name,exchangeName,"error");  channel.queueBind(queue2Name,exchangeName,"warning");  String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";  // 发送消息  channel.basicPublish(exchangeName,"warning",null,message.getBytes());  System.out.println(message);  // 释放资源  channel.close();  connection.close();  }  }

二、消费者代码

1、消费者1号

package com.atguigu.rabbitmq.routing;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue1Name = "test_direct_queue1";  channel.queueDeclare(queue1Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("Consumer1 将日志信息打印到控制台.....");  }  };  channel.basicConsume(queue1Name,true,consumer);  }  }

2、消费者2号

package com.atguigu.rabbitmq.routing;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String queue2Name = "test_direct_queue2";  channel.queueDeclare(queue2Name,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("Consumer2 将日志信息存储到数据库.....");  }  };  channel.basicConsume(queue2Name,true,consumer);  }  }

三、运行结果

1、绑定关系

img

2、消费消息

操作006:主题模式

 • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符

• Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert

• 通配符规则:

        • #:匹配零个或多个词

        • *:匹配一个词

一、生产者代码

package com.atguigu.rabbitmq.topic;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  public class Producer {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String exchangeName = "test_topic";  channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);  String queue1Name = "test_topic_queue1";  String queue2Name = "test_topic_queue2";  channel.queueDeclare(queue1Name,true,false,false,null);  channel.queueDeclare(queue2Name,true,false,false,null);  // 绑定队列和交换机  // 参数1. queue:队列名称  // 参数2. exchange:交换机名称  // 参数3. routingKey:路由键,绑定规则  //      如果交换机的类型为fanout ,routingKey设置为""  // routing key 常用格式:系统的名称.日志的级别。  // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库  channel.queueBind(queue1Name,exchangeName,"#.error");  channel.queueBind(queue1Name,exchangeName,"order.*");  channel.queueBind(queue2Name,exchangeName,"*.*");  // 分别发送消息到队列:order.info、goods.info、goods.error  String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";  channel.basicPublish(exchangeName,"order.info",null,body.getBytes());  body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";  channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());  body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";  channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());  channel.close();  connection.close();  }  }

二、消费者代码

1、消费者1号

消费者1监听队列1:

package com.atguigu.rabbitmq.topic;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer1 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String QUEUE_NAME = "test_topic_queue1";  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  }  };  channel.basicConsume(QUEUE_NAME,true,consumer);  }  }

2、消费者2号

消费者2监听队列2:

package com.atguigu.rabbitmq.topic;  import com.atguigu.rabbitmq.util.ConnectionUtil;  
import com.rabbitmq.client.*;  
import java.io.IOException;  public class Consumer2 {  public static void main(String[] args) throws Exception {  Connection connection = ConnectionUtil.getConnection();  Channel channel = connection.createChannel();  String QUEUE_NAME = "test_topic_queue2";  channel.queueDeclare(QUEUE_NAME,true,false,false,null);  Consumer consumer = new DefaultConsumer(channel){  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  }  };  channel.basicConsume(QUEUE_NAME,true,consumer);  }  }

三、运行效果

队列1:

img

队列2:

操作007:整合SpringBoot

搭建环境

• 基础设定:交换机名称、队列名称、绑定关系

• 发送消息:使用RabbitTemplate

• 接收消息:使用@RabbitListener注解

1、消费者工程

①创建module

②配置POM

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
​
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

③YAML

增加日志打印的配置:

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
logging:level:com.atguigu.mq.listener.MyMessageListener: info

④主启动类

仿照生产者工程的主启动类,改一下类名即可

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQConsumerMainType {
​public static void main(String[] args) {SpringApplication.run(RabbitMQConsumerMainType.class, args);}
​
}

⑤监听器

package com.atguigu.mq.listener;
​
import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order";  public static final String ROUTING_KEY = "order";  public static final String QUEUE_NAME  = "queue.order";  @RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}))public void processMessage(String dateString,Message message,Channel channel) {log.info(dateString);}}

2、@RabbitListener注解属性对比

①bindings属性

  • 表面作用:

    • 指定交换机和队列之间的绑定关系

    • 指定当前方法要监听的队列

  • 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

②queues属性

@RabbitListener(queues = {QUEUE_ATGUIGU})
  • 作用:指定当前方法要监听的队列

  • 注意:此时框架不会创建相关交换机和队列,必须提前创建好

3、生产者工程

①创建module

②配置POM

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
​
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>

③YAML

spring: rabbitmq: host: 192.168.200.100port: 5672 username: guest password: 123456 virtual-host: /

④主启动类

package com.atguigu.mq;  import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  @SpringBootApplication
public class RabbitMQProducerMainType {
​public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);  }
​
}

⑤测试程序

package com.atguigu.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest  
public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";  public static final String ROUTING_KEY = "order";@Autowired  private RabbitTemplate rabbitTemplate;@Test  public void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY,   "Hello atguigu");  }  }

消息可靠性投递

下单操作的正常流程

 故障情况1

消息没有发送到消息队列上

后果:消费者拿不到消息,业务功能缺失,数据错误

 故障情况2

消息成功存入消息队列,但是消息队列服务器宕机了

原本保存在内存中的消息也丢失了

即使服务器重新启动,消息也找不回来了

后果:消费者拿不到消息,业务功能缺失,数据错误

故障情况3 

消息成功存入消息队列,但是消费端出现问题,

例如:宕机、抛异常等等后果:业务功能缺失,数据错误

 对症下药

• 故障情况1:消息没有发送到消息队列

        • 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送

        • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机

• 故障情况2:消息队列服务器宕机导致内存中消息丢失

        • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失

• 故障情况3:消费端宕机或抛异常导致消息没有成功被消费

        • 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息

        • 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性

操作008-01-A:生产者端消息确认机制

一、创建module

二、搭建环境

1、配置POM

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
​
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

2、主启动类

没有特殊设定:

package com.atguigu.mq;  import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  @SpringBootApplication
public class RabbitMQProducerMainType {
​public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);  }
​
}

3、YAML

注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns: true # 队列的确认
logging:level:com.atguigu.mq.config.MQProducerAckConfig: info

三、创建配置类

1、目标

在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate

然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。

原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:

设置组件调用的方法所需对象类型
setConfirmCallback()ConfirmCallback接口类型
setReturnCallback()ReturnCallback接口类型

2、API说明

①ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

    /*** A callback for publisher confirmations.**/@FunctionalInterfacepublic interface ConfirmCallback {
​/*** Confirmation callback.* @param correlationData correlation data for the callback.* @param ack true for ack, false for nack* @param cause An optional cause, for nack, when available, otherwise null.*/void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
​}

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机

  • ack参数值为false:表示消息没有发送到交换机

②ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

    /*** A callback for returned messages.** @since 2.3*/@FunctionalInterfacepublic interface ReturnsCallback {
​/*** Returned message callback.* @param returned the returned message and metadata.*/void returnedMessage(ReturnedMessage returned);
​}

注意:接口中的returnedMessage()方法仅在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名类型含义
messageorg.springframework.amqp.core.Message消息以及消息相关数据
replyCodeint应答码,类似于HTTP响应状态码
replyTextString应答码说明
exchangeString交换机名称
routingKeyString路由键名称

3、配置类代码

①要点1

加@Component注解,加入IOC容器

②要点2

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。

操作封装到了一个专门的void init()方法中。

为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。

关于@PostConstruct注解大家可以参照以下说明:

@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。

使用@PostConstruct注解的方法必须满足以下条件:

  1. 方法不能有任何参数。

  2. 方法必须是非静态的。

  3. 方法不能返回任何值。

当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。

③代码

有了以上说明,下面我们就可以展示配置类的整体代码:

package com.atguigu.mq.config;
​
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}
​@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功!数据:" + correlationData);} else {log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);}}
​@Overridepublic void returnedMessage(ReturnedMessage returned) {//发送到交换机失败调用的方法log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}
}

四、发送消息

package com.atguigu.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest  
public class RabbitMQTest {  public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowired  private RabbitTemplate rabbitTemplate;@Test  public void testSendMessage() {  rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   ROUTING_KEY,   "Hello atguigu");  }  }

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确

  • 交换机正确、路由键不正确,无法发送到队列

  • 交换机不正确,无法发送到交换机

操作008-01-B:备份交换机

一、创建备份交换机

1、创建备份交换机

注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键

 

2、创建备份交换机要绑定的队列

①创建队列

②绑定交换机

注意:这里是要和备份交换机绑定

3、针对备份队列创建消费端监听器

    public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";public static final String QUEUE_NAME_BACKUP  = "queue.order.backup";
​@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),key = {""}))public void processMessageBackup(String dateString,Message message,Channel channel) {log.info("BackUp: " + dateString);}

二、设定备份关系

1、原交换机删除

·

2、重新创建原交换机

3、原交换机重新绑定原队列

三、测试

  • 启动消费者端

  • 发送消息,但是路由键不对,于是转入备份交换机

操作008-03:消费端消息确认

一、ACK

ACK是acknowledge的缩写,表示已确认

二、默认情况

默认情况下,消费端取回消息后,默认会自动返回ACK确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ得到ACK确认信息就会删除消息

但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了

所以还是要修改成手动确认

三、创建消费端module

1、配置POM

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
​
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

2、YAML

增加针对监听器的设置:

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息确认模式改为手动确认

3、主启动类

没有特殊设定:

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQConsumerMainType {
​public static void main(String[] args) {SpringApplication.run(RabbitMQConsumerMainType.class, args);}
​
}

四、消费端监听器

1、创建监听器类

package com.atguigu.mq.listener;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
​
@Component
public class MyMessageListener {
​public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME  = "queue.order";
​public void processMessage(String dataString, Message message, Channel channel) {
​}
​
}

2、在接收消息的方法上应用注解

// 修饰监听方法
@RabbitListener(// 设置绑定关系bindings = @QueueBinding(
​// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
​// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
​// 配置路由键信息key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) {
​
}

3、接收消息方法内部逻辑

  • 业务处理成功:手动返回ACK信息,表示消息成功消费

  • 业务处理失败:手动返回NACK信息,表示消息消费失败。此时有两种后续操作供选择:

    • 把消息重新放回消息队列,RabbitMQ会重新投递这条消息,那么消费端将重新消费这条消息——从而让业务代码再执行一遍

    • 不把消息放回消息队列,返回reject信息表示拒绝,那么这条消息的处理就到此为止

4、相关API

先回到PPT理解“deliveryTag:交付标签机制”

下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel接口

①basicAck()方法

  • 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了

  • 参数列表:

参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息

②basicNack()方法

  • 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值

  • 参数列表:

参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列

③basicReject()方法

  • 方法功能:根据指定的deliveryTag,对该消息表示拒绝

  • 参数列表:

参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列
  • basicNack()和basicReject()有啥区别?

    • basicNack()有批量操作

    • basicReject()没有批量操作

5、完整代码示例

package com.atguigu.mq.listener;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
@Component
@Slf4j
public class MyMessageListener {
​public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME  = "queue.order";
​// 修饰监听方法@RabbitListener(// 设置绑定关系bindings = @QueueBinding(
​// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
​// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
​// 配置路由键信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) throws IOException {
​// 1、获取当前消息的 deliveryTag 值备用long deliveryTag = message.getMessageProperties().getDeliveryTag();
​try {// 2、正常业务操作log.info("消费端接收到消息内容:" + dataString);// System.out.println(10 / 0);
​// 3、给 RabbitMQ 服务器返回 ACK 确认信息channel.basicAck(deliveryTag, false);} catch (Exception e) {
​// 4、获取信息,看当前消息是否曾经被投递过Boolean redelivered = message.getMessageProperties().getRedelivered();
​if (!redelivered) {// 5、requeu :如果没有被投递过,那就重新放回队列,重新投递,再试一次channel.basicNack(deliveryTag, false, true);} else {// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列channel.basicReject(deliveryTag, false);}
​}}
​
}

五、要点总结

  • 要点1:把消息确认模式改为手动确认

  • 要点2:调用Channel对象的方法返回信息

    • ACK:Acknowledgement,表示消息处理成功

    • NACK:Negative Acknowledgement,表示消息处理失败

    • Reject:拒绝,同样表示消息处理失败

  • 要点3:后续操作

    • requeue为true:重新放回队列,重新投递,再次尝试

    • requeue为false:不放回队列,不重新投递

  • 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据

六、流程梳理

七、多啰嗦一句

消费端如果设定消息重新放回队列,Broker重新投递消息,那么消费端就可以再次消费消息,这是一种“重试”机制,这需要消费端代码支持“幂等性”——这属于前置知识,不展开了。

操作009:Prefetch

一、思路

  • 生产者发送100个消息

  • 对照两种情况:

    • 消费端没有设置prefetch参数:100个消息被全部取回

    • 消费端设置prefetch参数为1:100个消息慢慢取回

二、生产者端代码

@Test  
public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}

三、消费者端代码

// 2、正常业务操作
log.info("消费端接收到消息内容:" + dataString);
​
// System.out.println(10 / 0);
TimeUnit.SECONDS.sleep(1);
​
// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);

四、测试

1、未使用prefetch

  • 不要启动消费端程序,如果正在运行就把它停了

  • 运行生产者端程序发送100条消息

  • 查看队列中消息的情况:

  • 说明:

    • Ready表示已经发送到队列的消息数量

    • Unacked表示已经发送到消费端但是消费端尚未返回ACK信息的消息数量

    • Total未被删除的消息总数

  • 接下来启动消费端程序,再查看队列情况:

  • 能看到消息全部被消费端取走了,正在逐个处理、确认,说明有多少消息消费端就并发处理多少

2、设定prefetch

①YAML配置

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manualprefetch: 1 # 设置每次最多从消息队列服务器取回多少消息

②测试流程

  • 停止消费端程序

  • 运行生产者端程序发送100条消息

  • 查看队列中消息的情况:

  • 接下来启动消费端程序,持续观察队列情况:

  • 能看到消息不是一次性全部取回的,而是有个过程

操作010:消息超时

一、队列层面设置

1、设置

别忘了设置绑定关系:

2、测试

  • 不启动消费端程序

  • 向设置了过期时间的队列中发送100条消息

  • 等10秒后,看是否全部被过期删除

二、消息层面设置

1、设置

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
​
@Test  
public void testSendMessageTTL() {  // 1、创建消息后置处理器对象  MessagePostProcessor messagePostProcessor = (Message message) -> {  // 设定 TTL 时间,以毫秒为单位message.getMessageProperties().setExpiration("5000");  return message;};// 2、发送消息  rabbitTemplate.convertAndSend(    EXCHANGE_DIRECT,     ROUTING_KEY,     "Hello atguigu", messagePostProcessor);    
}

2、查看效果

这次我们是发送到普通队列上:

操作011:死信

• 概念:当一个消息无法被消费,它就变成了死信。

• 死信产生的原因大致有下面三种:

        • 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false

        • 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信

         • 超时:消息到达超时时间未被消费

• 死信的处理方式大致有下面三种:

         • 丢弃:对不重要的消息直接丢弃,不做处理

        • 入库:把死信写入数据库,日后处理

        • 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用

一、测试相关准备

1、创建死信交换机和死信队列

常规设定即可,没有特殊设置:

  • 死信交换机:exchange.dead.letter.video

  • 死信队列:queue.dead.letter.video

  • 死信路由键:routing.key.dead.letter.video

2、创建正常交换机和正常队列

注意:一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机

  • 正常交换机:exchange.normal.video

  • 正常队列:queue.normal.video

  • 正常路由键:routing.key.normal.video

全部设置完成后参照如下细节:

3、Java代码中的相关常量声明

public static final String EXCHANGE_NORMAL = "exchange.normal.video";  
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";  public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";  
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";  public static final String QUEUE_NORMAL = "queue.normal.video";  
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";

二、消费端拒收消息

1、发送消息的代码

@Test  
public void testSendMessageButReject() {  rabbitTemplate  .convertAndSend(  EXCHANGE_NORMAL,  ROUTING_KEY_NORMAL,  "测试死信情况1:消息被拒绝");  
}

2、接收消息的代码

①监听正常队列

@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {// 监听正常队列,但是拒绝消息log.info("★[normal]消息接收到,但我拒绝。");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}

②监听死信队列

@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {  // 监听死信队列  log.info("★[dead letter]dataString = " + dataString);log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

3、执行结果

三、消息数量超过队列容纳极限

1、发送消息的代码

@Test  
public void testSendMultiMessage() {  for (int i = 0; i < 20; i++) {  rabbitTemplate.convertAndSend(  EXCHANGE_NORMAL,  ROUTING_KEY_NORMAL,  "测试死信情况2:消息数量超过队列的最大容量" + i);  }  
}

2、接收消息的代码

消息接收代码不再拒绝消息:

@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {// 监听正常队列log.info("★[normal]消息接收到。");channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

重启微服务使代码修改生效。

3、执行效果

正常队列的参数如下图所示:

生产者发送20条消息之后,消费端死信队列接收到前10条消息:

四、消息超时未消费

1、发送消息的代码

正常发送一条消息即可,所以使用第一个例子的代码。

@Test
public void testSendMessageTimeout() {rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,"测试死信情况3:消息超时");
}

2、执行效果

队列参数生效:

因为没有消费端监听程序,所以消息未超时前滞留在队列中:

消息超时后,进入死信队列:

延迟队列

 实现思路•

        方案1:借助消息超时时间+死信队列(就是刚刚我们测试的例子)

        方案2:给RabbitMQ安装插件

 基于插件的延迟队列

操作012:延迟插件

一、插件简介

  • 官网地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

  • 延迟极限:最多两天

二、插件安装

1、确定卷映射目录

docker inspect rabbitmq

运行结果:

        "Mounts": [{"Type": "volume","Name": "rabbitmq-plugin","Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data","Destination": "/plugins","Driver": "local","Mode": "z","RW": true,"Propagation": ""},{"Type": "volume","Name": "cca7bc3012f5b76bd6c47a49ca6911184f9076f5f6263b41f4b9434a7f269b11","Source": "/var/lib/docker/volumes/cca7bc3012f5b76bd6c47a49ca6911184f9076f5f6263b41f4b9434a7f269b11/_data","Destination": "/var/lib/rabbitmq","Driver": "local","Mode": "","RW": true,"Propagation": ""}]

和容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data

进入/var/lib/docker/volumes/rabbitmq-plugin/_data

2、下载延迟插件

官方文档说明页地址:Community Plugins | RabbitMQ

下载插件安装文件:

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data

3、启用插件

# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
​
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
​
# 退出Docker容器
exit
​
# 重启Docker容器
docker restart rabbitmq

4、确认

确认点1:查看当前节点已启用插件的列表:

确认点2:如果创建新交换机时可以在type中看到x-delayed-message选项,那就说明插件安装好了

三、创建交换机

rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message类型才可以,创建方式如下:

关于x-delayed-type参数的理解:

原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?

这里就额外使用x-delayed-type来指定交换机本身的类型

四、代码测试

1、生产者端代码

@Test
public void testSendDelayMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",messageProcessor -> {
​// 设置延迟时间:以毫秒为单位messageProcessor.getMessageProperties().setHeader("x-delay", "10000");
​return messageProcessor;});
}

2、消费者端代码

①情况A:资源已创建

package com.atguigu.mq.listener;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;  @Component  
@Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(queues = {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException {  log.info("[生产者]" + dataString);log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
​
}

②情况B:资源未创建

package com.atguigu.mq.listener;  
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.*;  
import org.springframework.stereotype.Component;  
​
import java.io.IOException;  
import java.text.SimpleDateFormat;  
import java.util.Date;  @Component  
@Slf4j
public class MyDelayMessageListener {  public static final String EXCHANGE_DELAY = "exchange.delay.video";public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(bindings = @QueueBinding(  value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),  exchange = @Exchange(  value = EXCHANGE_DELAY,   durable = "true",   autoDelete = "false",   type = "x-delayed-message",   arguments = @Argument(name = "x-delayed-type", value = "direct")),  key = {ROUTING_KEY_DELAY}  ))  public void process(String dataString, Message message, Channel channel) throws IOException {  log.info("[生产者]" + dataString);  log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  }  
​
}

3、执行效果

①交换机类型

②生产者端效果

注意:使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行

③消费者端效果

操作013:事务消息之生产者端

 

 

一、测试代码

1、引入依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
​
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

2、yaml配置

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /

3、主启动类

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQProducerMainType {
​public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);}
​
}

4、相关配置

package com.atguigu.mq.config;
​
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
@Data
public class RabbitConfig {
​@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
​@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}

5、测试代码

package com.atguigu.mq.test;
​
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest
@Slf4j
public class RabbitMQTest {
​public static final String EXCHANGE_NAME = "exchange.tx.dragon";public static final String ROUTING_KEY = "routing.key.tx.dragon";
​@Resourceprivate RabbitTemplate rabbitTemplate;
​@Testpublic void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
​// 2、抛出异常log.info("do bad:" + 10 / 0);
​// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");}
​
}

二、执行测试

1、未使用事务

抛出异常前的消息发送了,抛异常后的消息没有发送:

为了不影响后续操作,我们直接在管理界面这里把这条消息消费掉:

2、使用事务

①说明

因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控

②测试提交事务的情况

@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");
​// 2、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}

③测试回滚事务的情况

@Test
@Transactional
@Rollback(value = true)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~01)");// 2、抛出异常log.info("do bad:" + 10 / 0);
​// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~02)");
}

 

操作014:惰性队列

惰性队列:未设置惰性模式时队列的持久化机制

• 创建队列时,在Durability这里有两个选项可以选择

        • Durable:持久化队列,消息会持久化到硬盘上

        • Transient:临时队列,不做持久化操作,broker重启后消息会丢失

 惰性队列:未设置惰性模式时队列的持久化机制

• 那么Durable队列在存入消息之后,是否是立即保存到硬盘呢?

答:队列满了,或者mq服务器关闭的时候就会将消息存储到硬盘当中去

 

一、创建惰性队列

1、官网说明

 惰性队列:短时间提升缓存消息的能力

使用惰性队列的主要原因之一是支持非常长的队列(数百万条消息)。由于各种原因,排队可能会变得很长:

        • 消费者离线/崩溃/停机进行维护

        • 突然出现消息进入高峰,生产者的速度超过了消费者

        • 消费者比正常情况慢

 • 比较下面两个说法是否是相同的意思:

        • 立即移动到硬盘

        • 尽早移动到硬盘

• 我认为不一样:

        • 立即:消息刚进入队列时

        • 尽早:服务器不繁忙时

队列可以创建为默认惰性模式,模式指定方式是:

  • 使用队列策略(建议)

  • 设置queue.declare参数

如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。

2、基于策略方式设定

# 登录Docker容器
docker exec -it rabbitmq /bin/bash
​
# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量

  • set_policy是子命令,表示设置策略

  • Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的

  • "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置

  • '{"queue-mode":"lazy"}'是一个JSON格式的参数设置指定了队列的模式为"lazy"

  • –-apply-to参数指定该策略将应用于队列(queues)级别

  • 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列

如果需要修改队列模式可以执行如下命令(不必删除队列再重建):

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

3、在声明队列时使用参数设定

  • 参数名称:x-queue-mode

  • 可用参数值:

    • default

    • lazy

  • 不设置就是取值为default

Java代码原生API设置方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

Java代码注解设置方式:

@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy")
})

二、实操演练

1、生产者端代码

①配置POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version></parent>
​<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

②配置YAML

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /

③主启动类

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQLazyProducer {
​public static void main(String[] args) {SpringApplication.run(RabbitMQLazyProducer.class, args);}
​
}

④发送消息

package com.atguigu.mq.test;
​
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest
public class RabbitMQTest {
​public static final String EXCHANGE_LAZY_NAME = "exchange.atguigu.lazy";public static final String ROUTING_LAZY_KEY = "routing.key.atguigu.lazy";
​@Resourceprivate RabbitTemplate rabbitTemplate;
​@Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_LAZY_NAME, ROUTING_LAZY_KEY, "I am a message for test lazy queue.");}
​
}

2、消费者端代码

①配置POM

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version></parent>
​<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

②配置YAML

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /

③主启动类

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQLazyConsumerMainType {
​public static void main(String[] args) {SpringApplication.run(RabbitMQLazyConsumerMainType.class, args);}}

④监听器

package com.atguigu.mq.listener;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
​
@Component
@Slf4j
public class MyLazyMessageProcessor {
​public static final String EXCHANGE_LAZY_NAME = "exchange.atguigu.lazy";public static final String ROUTING_LAZY_KEY = "routing.key.atguigu.lazy";public static final String QUEUE_LAZY_NAME = "queue.atguigu.lazy";
​@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = {@Argument(name = "x-queue-mode", value = "lazy")}),exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"),key = {ROUTING_LAZY_KEY}))public void processMessageLazy(String data, Message message, Channel channel) {log.info("消费端接收到消息:" + data);}
​
}

三、测试

  • 先启动消费端

  • 基于消费端@RabbitListener注解中的配置,自动创建了队列

image-20231110201151470

  • 发送消息

操作015:优先级队列

• 默认情况:基于队列先进先出的特性,通常来说,先入队的先投递

• 设置优先级之后:优先级高的消息更大几率先投递

• 关键参数:x-max-priority

RabbitMQ允许我们使用一个正整数给消息设定优先级

        • 消息的优先级数值取值范围:1~255

        • RabbitMQ官网建议在1~5之间设置消息的优先级(优先级越高,占用CPU、内存等资源越多)

一、创建相关资源

1、创建交换机

exchange.test.priority

2、创建队列

queue.test.priority

x-max-priority

3、队列绑定交换机

二、生产者发送消息

1、配置POM

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
​
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

2、配置YAML

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /

3、主启动类

package com.atguigu.mq;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class RabbitMQPriorityProducer {
​public static void main(String[] args) {SpringApplication.run(RabbitMQPriorityProducer.class, args);}
​
}

4、发送消息

  • 不要启动消费者程序,让多条不同优先级的消息滞留在队列中

  • 第一次发送优先级为1的消息

  • 第二次发送优先级为2的消息

  • 第三次发送优先级为3的消息

  • 先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息

①第一次发送优先级为1的消息

package com.atguigu.mq.test;
​
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest
public class RabbitMQTest {
​public static final String EXCHANGE_PRIORITY = "exchange.test.priority";public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
​@Resourceprivate RabbitTemplate rabbitTemplate;
​@Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{message.getMessageProperties().setPriority(1);return message;});}
​
}

②第二次发送优先级为2的消息

@Test
public void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{message.getMessageProperties().setPriority(2);return message;});
}

③第三次发送优先级为3的消息

@Test
public void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{message.getMessageProperties().setPriority(3);return message;});
}

三、消费端接收消息

1、配置POM

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent>
​
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

2、配置YAML

spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /

3、主启动类

package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitMQPriorityConsumer {public static void main(String[] args) {SpringApplication.run(RabbitMQPriorityConsumer.class, args);}}

4、监听器

package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyMessageProcessor {public static final String QUEUE_PRIORITY = "queue.test.priority";@RabbitListener(queues = {QUEUE_PRIORITY})public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {log.info(data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}

5、测试效果

对于已经滞留服务器的消息,只要消费端一启动,就能够收到消息队列的投递,打印效果如下:

 

相关文章:

RabbitMQ知识点

1.为什么需要消息队列&#xff1f; RabbitMQ体系结构 操作001&#xff1a;RabbitMQ安装 二、安装 # 拉取镜像 docker pull rabbitmq:3.13-management ​ # -d 参数&#xff1a;后台运行 Docker 容器 # --name 参数&#xff1a;设置容器名称 # -p 参数&#xff1a;映射端口号&…...

Windsuf 连接失败问题:[unavailable] unavailable: dial tcp...

问题描述 3月6日&#xff0c;在使用Windsuf 时&#xff0c;遇到以下网络连接错误&#xff1a; [unavailable] unavailable: dial tcp 35.223.238.178:443: connectex: A connection attempt failed because the connected party did not properly respond after a period of…...

Spark 3.0核心新特性解析与行业应用展望

Spark 3.0核心新特性解析与行业应用展望 一、自适应查询执行(Adaptive Query Execution, AQE) 作为Spark 3.0最具突破性的优化,AQE通过运行时动态调整执行计划,解决了传统静态优化的局限性。其核心技术突破体现在三方面: 1. 动态分区合并(Dynamically Coalescing Shuf…...

基于 harbor 构建docker私有仓库

仓库&#xff08;Repository&#xff09;是集中存放镜像的地方&#xff0c;又分公共仓库和私有仓库。 有时候容易把仓库与注册服务器&#xff08;Registry&#xff09;混淆。 实际上注册服务器是存放仓库的具体服务器&#xff0c; 一个注册服务器上可以有多个仓库&#xff0c;…...

MySQL基本建表操作

目录 1&#xff0c;创建数据库db_ck 1.1创建表 1.2 查看创建好的表 2,创建表t_hero 2.1 先进入数据库Db_Ck 2.1.1 这里可以看是否进入数据库: 2.2 创建表t_Hero 2.2.1 我们可以先在文本文档里面写好然后粘贴进去&#xff0c;因为直接写的话&#xff0c;错了要重新开始 …...

低空经济快速发展,无人机人才培养及校企实验室共建技术详解

随着低空经济的快速发展&#xff0c;无人机作为该领域的关键技术载体&#xff0c;其应用范围和市场需求正在迅速扩大。为了满足这一趋势&#xff0c;无人机人才的培养以及校企实验室的共建成为了推动技术进步和产业升级的重要途径。以下是对无人机人才培养及校企实验室共建技术…...

电脑网络出现问题!简单的几种方法解除电脑飞行模式

在某些情况下&#xff0c;您可能需要关闭电脑上的飞行模式以便重新连接到 Wi-Fi、蓝牙或其他无线网络。本教程中简鹿办公将指导您如何在 Windows 和 macO S操作系统上解除飞行模式。 一、Windows 系统下解除飞行模式 通过快捷操作中心 步骤一&#xff1a;点击屏幕右下角的通知…...

Docker入门篇1:搜索镜像、拉取镜像、查看本地镜像列表、删除本地镜像

大家好我是木木&#xff0c;在当今快速发展的云计算与云原生时代&#xff0c;容器化技术蓬勃兴起&#xff0c;Docker 作为实现容器化的主流工具之一&#xff0c;为开发者和运维人员带来了极大的便捷 。下面我们一起开始入门第一篇&#xff1a;搜索镜像、拉取镜像、查看本地镜像…...

网络初级复习作业

作业要求&#xff1a; 1,学校内部的HTTP客户端可以正常通过域名www.baidu.com访问到白度网络中的HTTP服务器 2&#xff0c;学校网络内部网段基于192.168.1.0/24划分&#xff1a;PC1可以正常访问3.3.3.0/24网段&#xff0c;但是PC2不允许 3,学校内部路由使用静态路由&#xf…...

Spring Boot 调用DeepSeek API的详细教程

目录 前置准备步骤1&#xff1a;创建Spring Boot项目步骤2&#xff1a;配置API参数步骤3&#xff1a;创建请求/响应DTO步骤4&#xff1a;实现API客户端步骤5&#xff1a;创建控制器步骤6&#xff1a;异常处理步骤7&#xff1a;测试验证单元测试示例Postman测试请求 常见问题排查…...

rpc和proto

rpc全称远程过程控制&#xff0c;说白了是一种对信息发送和接收的规则编写方法&#xff0c;来自google&#xff0c;这些规则会以protobuf代码存到proto文件里。我以autoGen中agent_worker.proto为例&#xff0c;大概长这样 syntax "proto3";package agents;option …...

我的两个医学数据分析技术思路

我的两个医学数据分析技术思路 从临床上获得的或者公共数据库数据这种属于观察性研究&#xff0c;是对临床诊疗过程中自然产生的数据进行分析而获得疾病发生发展的规律等研究成果。再细分&#xff0c;可以分为独立危险因素鉴定和预测模型构建两种。 独立危险因素鉴定是一直以…...

GitHub上传项目

总结&#xff08;有基础的话直接执行这几步&#xff0c;就不需要再往下看了&#xff09;&#xff1a; git init 修改git的config文件&#xff1a;添加:[user]:name你的github用户名 email你注册github的用户名 git branch -m master main git remote add origin 你的URL gi…...

汇编点亮LED

目录 一、ARM常用汇编指令 二、汇编点亮LED 2.1 GPIO简述 2.2 GPIO相关寄存器 2.3 LED原理图 2.4 汇编点亮LED 一、ARM常用汇编指令 常用汇编格式: label : instruction @ comment label:标号 instruction:具体汇编指令 comment:注释内容 常…...

VS Code C++ 开发环境配置

VS Code 是当前非常流行的开发工具. 本文讲述如何配置 VS Code 作为 C开发环境. 本文将按照如下步骤来介绍如何配置 VS Code 作为 C开发环境. 安装编译器安装插件配置工作区 第一个步骤的具体操作会因为系统不同或者方案不同而有不同的选择. 环境要求 首先需要立即 VS Code…...

Python深度学习算法介绍

一、引言 深度学习是机器学习的一个重要分支&#xff0c;它通过构建多层神经网络结构&#xff0c;自动从数据中学习特征表示&#xff0c;从而实现对复杂模式的识别和预测。Python作为一门强大的编程语言&#xff0c;凭借其简洁易读的语法和丰富的库支持&#xff0c;成为深度学…...

hadoop集群HDFS读写性能测试

一、写测试命令 hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.4-tests.jar TestDFSIO -write -nrFiles 10 -size 10MB二、读测试命令 hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.4-t…...

HTTPS加密原理详解

目录 HTTPS是什么 加密是什么 HTTPS的工作流程 1.使用对称加密 2.引入非对称加密 3.引入证书机制 客户端验证证书真伪的过程 签名的加密流程 整体工作流程 总结 HTTPS是什么 HTTPS协议也是一个应用程协议&#xff0c;是在HTTP的基础上加入了一个加密层&#xff0c;由…...

react基本功

useLayoutEffect useLayoutEffect 用于在浏览器重新绘制屏幕之前同步执行代码。它与 useEffect 相同,但执行时机不同。 主要特点 执行时机:useLayoutEffect 在 DOM 更新完成后同步执行,但在浏览器绘制之前。这使得它可以在浏览器渲染之前读取和修改 DOM,避免视觉上的闪烁…...

计算机视觉|3D 点云处理黑科技:PointNet++ 原理剖析与实战指南

一、引言 在当今数字化与智能化快速发展的时代&#xff0c;3D 点云处理技术在多个前沿领域中发挥着重要作用。特别是在自动驾驶和机器人视觉等领域&#xff0c;这项技术已成为实现智能化的关键支撑。 以自动驾驶为例&#xff0c;车辆需要实时感知周围复杂的环境信息&#xff…...

【VUE2】第三期——样式冲突、组件通信、异步更新、自定义指令、插槽

目录 1 scoped解决样式冲突 2 data写法 3 组件通信 3.1 父子关系 3.1.1 父向子传值 props 3.1.2 子向父传值 $emit 3.2 非父子关系 3.2.1 event bus 事件总线 3.2.2 跨层级共享数据 provide&inject 4 props 4.1 介绍 4.2 props校验完整写法 5 v-model原理 …...

WebAssembly技术及应用了解

WebAssembly&#xff08;Wasm&#xff09;是一种为Web设计的高效、低级的二进制指令格式&#xff0c;旨在提升Web应用的性能并支持多种编程语言。以下是对其核心概念、优势、应用场景及开发流程的系统介绍&#xff1a; 1. 核心概念 二进制格式&#xff1a;Wasm采用紧凑的二进制…...

工程化与框架系列(26)--前端可视化开发

前端可视化开发 &#x1f4ca; 引言 前端可视化是现代Web应用中不可或缺的一部分&#xff0c;它能够以直观的方式展示复杂的数据和信息。本文将深入探讨前端可视化开发的关键技术和最佳实践&#xff0c;包括图表绘制、数据处理、动画效果等方面。 可视化技术概述 前端可视化…...

ESP32的IDF开发学习-WiFi的开启、配置与连接

前言 本章节将实现如何使用ESP32的WiFi功能&#xff0c;尽可能的详细地介绍 简介 ESP32中的wifi支持双工作模式 Station&#xff08;STA&#xff09;模式&#xff1a;连接到路由器或其他AP设备&#xff0c;可通过esp_wifi_set_mode(WIFI_MODE_STA)设置。SoftAP模式&#xf…...

2025-3-9 一周总结

目前来看本学期上半程汇编语言,编译原理,数字电路和离散数学是相对重点的课程. 在汇编语言和编译原理这块,个人感觉黑书内知识点更多,细节更到位,体系更完整,可以在老师讲解之前进行预习 应当及时复习每天的内容.第一是看书,然后听课,在一天结束后保证自己的知识梳理完整,没有…...

【网络编程】事件选择模型

十、基于I/O模型的网络开发 10.9 事件选择模型 10.0.1 基本概念 事件选择(WSAEventSelect) 模型是另一个有用的异步 I/O 模型。和 WSAAsyncSelect 模 型类似的是&#xff0c;它也允许应用程序在一个或多个套接字上接收以事件为基础的网络事件通知&#xff0c;最 主要的差别在…...

Java核心语法:从变量到控制流

一、变量与数据类型&#xff08;对比Python/C特性&#xff09; 1. 变量声明三要素 // Java&#xff08;强类型语言&#xff0c;需显式声明类型&#xff09; int age 25; String name "CSDN"; // Python&#xff08;动态类型&#xff09; age 25 name …...

信息安全与网络安全的区别_信息安全与网络安全之差异探析

在当今数字化时代&#xff0c;信息安全与网络安全成为了人们关注的热点话题。尽管这两个概念经常被提及&#xff0c;但它们之间存在着明显的区别。本文旨在探讨信息安全与网络安全的定义、范畴及应对策略&#xff0c;以帮助读者更好地理解和应对相关挑战。 一、定义与范畴的差…...

http协议的三次握手机制

HTTP协议是基于TCP协议的&#xff0c;因此HTTP的三次握手机制实际上就是TCP的三次握手机制。TCP&#xff08;传输控制协议&#xff09;是一种面向连接的、可靠的、基于字节流的传输层通信协议。为了确保通信的可靠性&#xff0c;TCP在建立连接时需要进行三次握手。下面我们详细…...

探秘沃尔什-哈达玛变换(WHT)原理

沃尔什-哈达玛变换&#xff08;WHT&#xff09;起源 起源与命名&#xff08;20世纪早期&#xff09; 数学基础&#xff1a;该变换的理论基础由法国数学家雅克哈达玛&#xff08;Jacques Hadamard&#xff09;在1893年提出&#xff0c;其核心是哈达玛矩阵的构造。扩展与命名&…...

C++ Windows下屏幕截图

屏幕截图核心代码&#xff08;如果要求高帧率&#xff0c;请使用DxGI&#xff09;&#xff1a; // RGB到YUV的转换公式 #define RGB_TO_Y(r, g, b) ((int)((0.299 * (r)) (0.587 * (g)) (0.114 * (b)))) #define RGB_TO_U(r, g, b) ((int)((-0.169 * (r)) - (0.331 * (g)) …...

【python爬虫】酷狗音乐爬取练习

注意&#xff1a;本次爬取的音乐仅有1分钟试听&#xff0c;仅作学习爬虫的原理&#xff0c;完整音乐需要自行下载客户端。 一、 初步分析 登陆酷狗音乐后随机选取一首歌&#xff0c;在请求里发现一段mp3文件&#xff0c;复制网址&#xff0c;确实是我们需要的url。 复制音频的…...

电路的一些设计经验

这个C37在这里位于AMS1117-3.3稳压器的输入端。这个是作为输入滤波电容&#xff0c;有助于平滑输入电压&#xff0c;减少输入电压的纹波和噪声&#xff0c;从而提高稳压器LDO的稳定性。 电容器储存电荷&#xff0c;当输入电压出现小的拨动或者纹波时&#xff0c;电容器可以释放…...

Windows编译环境搭建(MSYS2\MinGW\cmake)

我的音视频/流媒体开源项目(github) 一、基础环境搭建 1.1 MSYS2\MinGW 参考&#xff1a;1. 基于MSYS2的Mingw-w64 GCC搭建Windows下C开发环境_msys2使用mingw64编译 在Widndows系统上&#xff0c;使用gcc工具链&#xff08;g&#xff09;进行C程序开发&#xff1f;可以的&a…...

Vue 框架深度解析:源码分析与实现原理详解

文章目录 一、Vue 核心架构设计1.1 整体架构流程图1.2 模块职责划分 二、响应式系统源码解析2.1 核心类关系图2.2 核心源码分析2.2.1 数据劫持实现2.2.2 依赖收集过程 三、虚拟DOM与Diff算法实现3.1 Diff算法流程图3.2 核心Diff源码 四、模板编译全流程剖析4.1 编译流程图4.2 编…...

前端 | CORS 跨域问题解决

问题&#xff1a;Access to fetch at http://localhost:3000/save from origin http://localhost:5174 has been blocked by CORS policy: Response to preflight request doesnt pass access control check: No Access-Control-Allow-Origin header is present on the request…...

《白帽子讲 Web 安全》之文件操作安全

目录 引言 &#xff08;一&#xff09;文件上传与下载漏洞概述 1.文件上传的常见安全隐患 1.1前端校验的脆弱性与服务端脚本执行危机在文件上传流程中&#xff0c;部分开发者可能会在前端使用 JavaScript 代码对文件后缀名进行简单校验&#xff0c;试图以此阻止非法文件上传…...

【AI】AI开源IDE:CLine源码分析报告

1. 源码位置&#xff1a; CLine 是一个开源的 VSCode 插件&#xff0c;其完整源码托管在 GitHub 的 cline/cline 仓库中。这个仓库包含 CLine 的核心逻辑&#xff08;TypeScript 编写&#xff09;&#xff0c;包括与 LLM 的对话控制、工具调用接口&#xff0c;以及 VSCode 插件…...

使用数据库和缓存的时候,是如何解决数据不一致的问题的?

1.缓存更新策略 1.1. 缓存旁路模式&#xff08;Cache Aside&#xff09; 在应用里负责管理缓存&#xff0c;读取时先查缓存&#xff0c;如果命中了则返回缓存&#xff0c;如果未命中就查询数据库&#xff0c;然后返回缓存&#xff0c;返回缓存的同时把数据给写入缓存中。更新…...

docker compose 以redis为例

常见docker compose 命令 》》注意这个是旧版本的&#xff0c;新版本 docker 与compose 之间没有 - 新版本的 docker compose 把 version 取消了 &#xff0c;redis 默认是没有配置文件的 &#xff0c;nginx&#xff0c;mysql 默认是有的 services:redis:image: redis:lat…...

基于Kubernetes部署MySQL主从集群

以下是一个基于Kubernetes部署MySQL主从集群的详细YAML示例&#xff0c;包含StatefulSet、Service、ConfigMap和Secret等关键配置。MySQL主从集群需要至少1个主节点和多个从节点&#xff0c;这里使用 StatefulSet 初始化脚本 实现主从自动配置。 1. 创建 Namespace (可选) ap…...

VMware中安装配置Ubuntu(2024最新版 超详细)

目录 一、安装虚拟机软件 二、VMware虚拟机 三、 Ubuntu 下载 &#xff08;1&#xff09;官网下载 &#xff08;2&#xff09;清华镜像网站下载 四、创建虚拟机 五、Ubuntu 系统安装过程的配置 六、更换国内镜像源 七、环境搭建完毕 全篇较长&#xff0c;请慢慢观看 一…...

【Linux】信号处理以及补充知识

目录 一、信号被处理的时机&#xff1a; 1、理解&#xff1a; 2、内核态与用户态&#xff1a; 1、概念&#xff1a; 2、重谈地址空间&#xff1a; 3、处理时机&#xff1a; 补充知识&#xff1a; 1、sigaction&#xff1a; 2、函数重入&#xff1a; 3、volatile&…...

如何在rust中解析 windows 的 lnk文件(快捷方式)

一、从标题二开始看&#x1f601; 这些天在使用rust写一个pc端应用程序&#xff0c;需要解析lnk文件获取lnk的图标以及原程序地址&#xff0c;之前并没有过pc端应用程序开发的经验&#xff0c; 所以在广大的互联网上游荡了两天。额&#x1f97a; 今天找到了这个库 lnk_parse很…...

大模型系列课程学习-基于Vllm/Ollama/Ktransformers完成Deepseek推理服务部署

1.机器配置及实验说明 基于前期搭建的双卡机器装机教程&#xff0c;配置如下&#xff1a; 硬件名称参数备注CPUE5-2680V42 *2&#xff08;线程28个&#xff09;无GPU2080TI-22G 双卡魔改卡系统WSL Unbuntu 22.04.5 LTS虚拟机 本轮实验目的&#xff1a;基于VLLM/Ollama/ktran…...

Unity Shader学习总结

1.帧缓冲区和颜色缓冲区区别 用于存储每帧每个像素颜色信息的缓冲区 帧缓冲区包括&#xff1a;颜色缓冲区 深度缓冲区 模板缓冲区 自定义缓冲区 2.ImageEffectShader是什么 后处理用的shader模版 3.computerShader 独立于渲染管线之外&#xff0c;在显卡上运行&#xff0c;大量…...

Java多线程与高并发专题——什么是阻塞队列?

引入 阻塞队列&#xff08;Blocking Queue&#xff09;是一种线程安全的队列数据结构&#xff0c;它的主要特点是&#xff1a; 线程安全&#xff1a;多个线程可以安全地同时访问队列。阻塞操作&#xff1a;当队列为空时&#xff0c;从队列中获取元素的操作会被阻塞&#xff0…...

【Recon】CTF Web类题目主要类型

CTF Web类题目主要类型 1. 信息搜集类2. 注入类漏洞3. 文件处理漏洞4. 身份验证与会话漏洞5. 服务端漏洞6. 客户端漏洞7. 代码审计与PHP特性8. 业务逻辑漏洞总结 CTF&#xff08;Capture The Flag&#xff09;竞赛中的Web类题目主要考察参赛者对Web应用漏洞的识别与利用能力&am…...

comfyui(python)下载insightface失败

使用comfyui时&#xff0c;安装插件zenid、instantid、ip-adapter等换脸插件时&#xff0c;因为依赖insightface安装失败&#xff0c;导致插件中的节点无法正常使用&#xff0c;需要单独安装insightface。 下载insightface到本地&#xff0c;下载地址 选择与自己python版本一致…...

《DataWorks 深度洞察:量子机器学习重塑深度学习架构,决胜复杂数据战场》

在数字化浪潮汹涌澎湃的当下&#xff0c;大数据已然成为推动各行业发展的核心动力。身处这一时代洪流&#xff0c;企业对数据的处理与分析能力&#xff0c;直接关乎其竞争力的高低。阿里巴巴的DataWorks作为大数据领域的扛鼎之作&#xff0c;凭借强大的数据处理与分析能力&…...