RabbitMQ消息队列的笔记
Rabbit与Java相结合
- 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在配置文件中编写关于rabbitmq的配置
rabbitmq:host: 192.168.190.132 //这个那个Linxu的端口号port: 5672 //端口号 ,固定的username: admin //这个是那个用户名password: 123546 //密码virtual-host: powernode //配置我们发消息发到那个虚拟主机上
- 编写一个配置类
1装RabbitMQ之前对Linux的更改
1.1 按照上传下载工具
yum install lrzsz -y
1.2 查看主机名字
more /etc/hostname
1.3 修改主机名
hostnamectl set-hostname 要修改主机名
4
2 链式编程
- 第一步:就是set方法中的返回值全部改为对象本身
public Student setAdderess(String adderess) {
this.adderess = adderess;
return this;
}
- 第二步:就可以使用链式编程了,因为当我们执行这个student.setId(20)的时候,我们在set方法中设置的返回值是一个Student对象。所以我们还可以继续.setName()
public class Chainlearn {public static void main(String[] args) {Student student = new Student();student.setId(20).setName("lisi").setAge("20").setAdderess("shanxi0");System.out.println(student);}
- 使用lombok生成链式编程
@Accessors(chain = true) //生成链式编程
-
- 第一步就是添加依赖lomback依赖
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version></dependency>
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //生成链式编程
public class Teacher implements Serializable {private Integer id;private String name;private String age;private String adderess;}
3 建造者模式
建造者模式就是一步一步的建造一个复杂的是对象 。它一般是将一个复杂的对象分解为多个简单的对象,然后一步一步构造而成
3.1 使用lombok就可以给我们生曾建造者模式
就是使用@Bilder
生成建造者模式代码
建造者模式就是用来创建对象的。
实现过程
- 在使用lombok注解的情况下,我们在类上定义一个注解
@Builder //生成建造者模式
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //生成链式编程
@Builder //生成建造者模式
public class Teacher implements Serializable {private Integer id;private String name;private String age;private String adderess;}
- 创建对象并赋值就是使用对象.budiler.属性名
// 用建造者创建对象Teacher t = Teacher.builder().id(20).name("wangwu").age("10").adderess("真的").build();}
4 系统的启动任务
就是SpringBoot工程启动时,就会执行的任务
实现步骤:
就是在springBoot的启动类上实现ApplicationRunner
接口,并实现里面的方法,完后在方法中就可以输出东西。(我们应该用@Slf4j
)来输出东西,不要使用System.out.println("你好");
@SpringBootApplication
@Slf4j
public class Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Application.class,args);}@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("系统已启动,我就会运行");System.out.println("你好");}
}
5 RabbitMQ的定义
其中MQ的全称:message queue
消息队列
5.1 消息中间件
简单来说,消息中间件就是指保存数据的一个容器(服务器)
,可以用于两个系统之间的数据传递
消息中间件一般有三个主要角色:生产者(producer),消费者(consumer),消息代理(消息队列、消息服务器)
生产者发送消息到消息服务器,然后消费者从消息代理(消息队列)中获取数据并进行处理。
6 使用场景
6.1 异步处理
- 不使用MQ时,
-
- 下订单:1、执行下订单业务---->2、再处理加积分业务----->3、发红包业务---->4、发送手机短信业务
- 使用MQ时,加快执行速度
-
- 只执行下订单的业务 ,完后直接返回,完后向MQ发送消息---->积分系统处理积分业务后加积分,红包系统处理发红包业务后发红包、手机短信系统执行发送短信的业务后发短信
6.2 系统解耦
- 未使用MQ时
-
- 是2A系统调用B系统,再调用C系统......
- 使用MQ时
-
- A系统往MQ系统发送一条消息,B系统接受到消息处理,C系统接收到消息处理,其中不同的系统可以使用不同的语言来写。
6.3 流量削峰
就是双十一时,大量订单,发送到系统A,则此时系统A可以把消息发送到MQ中,MQ再匀速的发送给系统B,完后由系统B去操作数据库。
7 RabbitMQ运行环境搭建
7.1 RabbitMQ是由Erlang语言开发的 ,所以需要先下载安装Erlang
- 下载Erlang
- 安装erlang前先安装Linux依赖库
-
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
- 解压erlang压缩包文件
-
tar -zxvf otp_src_25.1.1.tar.gz
- 进入otp_src_25.1.1 目录 完后配置
-
cd otp_src_25.1.1
./configure
- 编译
-
make
- 安装,就是把这个放在path目录下,这样在任何的目录就都可以使用了。
-
make install
- 安装好了erlang后可以将解压的文件夹删除:
-
rm -rf otp_src_25.1.1
- 验证erlang是否安装成功
-
- 在命令行输入:erl如果进入了编程命令行则表示安装成功,然后按ctrl + z 退出编程命令行;
7.2 安装RabbitMQ以及启动和停止
- 解压RabbitMQ的压缩包,即安装完成,无需再编译
-
tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz -C /usr/local/
说明 -C 选项是指定解压目录,如果不指定会解压到当前目录
此时rabbitmq就安装好了;
- 启动RabbitMQ
-
- 首先切换到我们的安装目录下
/usr/local
---->cd rabbitmq_server-3.10.11/
- 首先切换到我们的安装目录下
--->完后再切换到那个sbin/
目录下
-
- 就执行命令启动
./rabbitmq-server -detached
- 当配置完环境变量以后,我们就可以在任何目录下启动
rabbitmq-server -detached
- 就执行命令启动
说明:
-detached 将表示在后台启动运行rabbitmq;不加该参数表示前台启动;
rabbitmq的运行日志存放在安装目录的var目录下;
现在的目录是:/usr/local/rabbitmq_server-3.10.11/var/log/rabbitmq
- 停止RabbitMQ
-
- 切换到sbin目录下执行: 执行
./rabbitmqctl shutdown
- 切换到sbin目录下执行: 执行
7.3 查看MQ的状态
- 切换到sbin目录下执行:执行
说明:-n rabbit 是指定节点名称为rabbit,目前只有一个节点,节点名默认为rabbit
此处-n rabbit 也可以省略
7.4 配置path环境变量
- 编辑这个文件
vi /etc/profile
- 在这个文件中写入以下语句
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbinexport RABBIT_HOME PATH
- 刷新环境变量 执行
source /etc/profile
8 RabbitMQ的管理命令
8.1 用户管理
用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。
这些操作都是通过rabbitmqctl管理命令来实现完成。
- 查看当前用户列表
-
rabbitmqctl list_users
- 新增一个用户
-
- 语法:
rabbitmqctl add_user Username Password
- 语法:
- 设置用户角色
-
rabbitmqctl set_user_tags 用户名 角色名
- 设置用户权限
-
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
-
-
".*" ".*" ".*"
这几个分别是读、写、配置的权限
-
- 查看用户权限
-
rabbitmqctl list_permissions
9 RabbitMQ的 web管理后台
- 几个步骤:
-
- Rabbitmq有一个web管理后台,这个管理后台是以插件的方式提供的,启动后台web管理功能,切换到sbin目录下执行以下几步:
-
-
-
- 查看rabbitmq 的插件列表,插件的最前面如果是一个空的[ ]代表没有启用
-
-
-
-
-
-
rabbitmq-plugins list
-
-
-
-
-
-
- 启用
-
-
-
-
-
-
rabbitmq-plugins enable rabbitmq_management
-
-
-
-
-
-
- 禁用
-
-
-
-
-
-
rabbitmq-plugins disable rabbitmq_management
-
-
-
-
- 下一步就是防火墙操作
systemctl status firewalld
--检查防火墙状态
systemctl stop firewalld
--关闭防火墙,Linux重启之后会失效
systemctl disable firewalld
--防火墙置为不可用,Linux重启后,防火墙服务不自动启动,依然是不可用
-
- 访问
-
-
-
- http://192.168.190.132:15672/
-
-
用户名/密码为我们上面创建的admin/123456
注意上面改成你的虚拟主机的ip地址
备注:如果使用默认用户guest、密码guest登录,会提示User can only log in via localhost
说明guest用户只能从localhost本机登录,所以不要使用该用户。
9.1 通过web页面新建虚拟主机
- 第一步:点击那个admin
- 第二步:点击那个User完后哪里就可以创建用户
- 第三步:点击那个Virtual Hosts就可以添加虚拟主机
10 .RabbitMQ工作模型
broker(服务器) 相当于mysql服务器,virtual host相当于数据库(可以有多个数据库)queue相当于表,消息相当于记录。
消息队列有三个核心要素: 消息生产者、消息队列、消息消费者
- 生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)
- 消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)
- 代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;
- 信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道进行的;
- 虚拟主机(Virtual host):一个虚拟分组,在代码中就是一个字符串,当多个不同的用户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual host创建exchange/queue等;(分类比较清晰、相互隔离)
- 交换机(Exchange):交换机负责从生产者接收消息,并根据交换机类型分发到对应的消息队列中,起到一个路由的作用;
- 路由键(Routing Key):交换机根据路由键来决定消息分发到哪个队列,路由键是消息的目的地址;
- 绑定(Binding):绑定是队列和交换机的一个关联连接(关联关系);
- 队列(Queue):存储消息的缓存;
- 消息(Message):由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,字符串、user对象,json串等等)
11 .RabbitMQ交换机类型
Exchange(X)可翻译成交换机/交换器/路由器
11.1 RabbitMQ交换器 (Exchange)类型
Fanout Exchange(扇形) |
Direct Exchange(直连) |
Topic Exchange(主题) |
Headers Exchange(头部) |
11.2 Fanout Exchange(扇形交换机/器)
Fanout 扇形的,散开的;扇形交换机
投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;
上面P代表product(生产者) X代表交换机/器 红色的代表队列
我们的生产者发消息只能发送到交换机上,当消息发送到交换机以后,只要队列和交换机绑定了,那么就把消息分散的发送到这两个队列当中。(如果绑定10个队列,那么就会分散的发送到这10个队列当中)
11.3 例子【Fanout Exchange(扇形交换机/器)】
- 实现步骤:
-
- 第一步:添加依赖
<!-- rabbitMQ的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
- 第二步: 添加配置信息
rabbitmq: host: 192.168.190.132 //地址port: 5672 //端口号username: admin //用户名password: 123546 //密码virtual-host: powernode //设置发消息发送到那个虚拟主机上
-
- 第三步: 创建一个Rabbit的配置类
-
-
- 在这个配置类中有一个三部曲
-
-
-
-
- 第一步:定义交换机
-
-
@Bean
//FanoutExchange 说明就是定义的一个扇形交换机
public FanoutExchange fanoutExchange(){//括号里面就是给交换机起个名字return new FanoutExchange("exchange.fanout");
}
-
-
-
- 第二步定义队列(多个队列)
-
-
@Beanpublic Queue queueA(){ //队列Areturn new Queue("queue.famout.a");}@Beanpublic Queue queueB(){ //队列B return new Queue("queue.famout.b");}
-
-
-
- 绑定交换机和队列(扇形交换机不需要指定key)
-
-
-
-
-
-
- 通过binding这个方法,其中参数就是把交换机传进来,以及把要绑定的队列传进来
- 调用BindingBuilder的.bind()完后把那个队列传进来。再使用to()括号里面是交换机的名字
-
-
-
@Beanpublic Binding bindingA(FanoutExchange fanoutExchange ,Queue queueA){return BindingBuilder.bind(queueA).to(fanoutExchange);}@Beanpublic Binding bindingB(FanoutExchange fanoutExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(fanoutExchange);}
-
- 第四步: 发送消息
-
-
- 第一步:注入
rabbitTemplate
- 第二步:创建一个方法来定义消息,完后再使用
Message()
这个类来把消息封装了.
- 第一步:注入
-
//定义消息String msg ="hello world";//这里Message中需要的参数是byte ,我们需要getBytes,将String转换为byte//需要使用这个将消息封装一下Message message = new Message(msg.getBytes());
-
-
- 第三步:就是使用
rabbitTemplate
这个类中convertAndSend
方法来发送消息.其中括号中参数分别是:
- 第三步:就是使用
-
-
-
-
-
- 第一个写的是我们在配置类中定义的额交换机的名字
- 第二个一般都需要路由key吗,但是扇形交换机不需要路由key
- 第三个参数就是我们封装的消息
-
-
-
rabbitTemplate.convertAndSend("exchange.fanout","",message);
扇形交换机发送消息的完整写法
public class MessageService {//先注入rabbitTemplate// 这里直接可以使用rabbitTemplate是因为springBoot中有一个自动装配的功能,我们把依赖导入@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//定义消息String msg ="hello world";//这里Message中需要的参数是byte ,我们需要getBytes,将String转换为byte//需要使用这个将消息封装一下Message message = new Message(msg.getBytes());//发送消息,括号里面第一个写的是我们在配置类中定义的额交换机的名字//第二个一般都需要路由key吗,但是扇形交换机不需要路由key//第三个参数就是我们封装的消息rabbitTemplate.convertAndSend("exchange.fanout","",message);log.info("消息发送完毕,发送时间为:{}" ,new Date());}
}
-
- 第五步: 上面编写完方法以后,我们最后一步,调用这个方法,来发送消息,在启动上实现
ApplicationRunner
类(这个类表示程序一启动就会执行里面的那个方法),完后并重写里面的方法,完后我们把那个编写消息的那个类注入进来,完后调用那个方法即可.
public class Application implements ApplicationRunner {@Autowiredprivate MessageService messageService;public static void main( String[] args ){SpringApplication.run(Application.class,args);}/*** 程序一启动就会运行* @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
上述这样就代表消息发送过去了
-
- 第六步 : 接收消息
-
-
- 一般情况都是一个模块发送消息,另一个模块接收消息
-
-
-
-
- 创建一个新的模块,来接收消息
- 第一步:还是添加rabbitmq的依赖
- 第二步:在配置文件中添加rabbitmq的相关配置。
- 第三步:编写一个接收消息的类,完后来接收消息
-
-
-
-
-
-
- 这个类编写的步骤:
-
-
-
-
-
-
-
-
- 第一步:创建一个方法,在里面传入Message的参数。
- 第二步:在这个类上标注解@RabbitListener(),其中括号里面写的就是对队列的名称。
- 第三步:就是调用message.getBody()方法,来获取消息。
- 第四步:就是将接收到消息的字节数组,转为字符串
-
-
-
-
@Slf4j
@Component
public class ReceiveMessage {/*** 接收两个队列的消息* @RabbitListener() 就是接收哪些队列的消息,就写哪些队列*/@RabbitListener(queues = {"queue.famout.a","queue.famout.b"})public void receiveMsg(Message message){byte[] body = message.getBody();//上面那个得到的是字节数据,下面转为字符串String msg = new String(body);log.info("接收的消息为: {} " ,msg);}
}
11.4 Direct Exchange(直连交换机)
根据路由键精确匹配(一模一样)进行路由消息队列;
其中P代表的是生产者 X代表的是交换机 C代表的是消费者 红色的代表队列
那个error代表的就是路由key
那个info error warning 也是路由key
原理就是 :生产者发送消息到交换机时,也会指定一个key(发送key),交换机发送到队列也需要一个key(路由key) ,当发送key和路由key相同时,才会将交换机中的消息发送到队列中
【例子】:当发消息的时候指定的路由key是hello,那么到交换机以后,则消息不会进入任何一个队列
当发消息的时候指定的路由key是error,则到交换机以后,则消息会同时发送到两个队列。
11.5 例子【直连交换机的例子】:
实现步骤:
-
- 第一步:添加依赖
- 第二步:在配置文件中配置rabbitmq的配置
- 第三步:创建一个rabbitmq的配置类
-
-
- 创建步骤:
-
-
-
-
- 还是那三部曲
-
-
-
-
-
-
- 第一步:定义交换机
-
-
-
@Beanpublic DirectExchange directExchange (){return ExchangeBuilder.directExchange(exchangeName).build();}
-
-
-
-
- 定义队列
-
-
-
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
-
-
-
-
- 交换机与队列绑定
-
-
-
@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB1(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("info");}@Beanpublic Binding bindingB2(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("error");}@Beanpublic Binding bindingB3(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("warning");}
完整的代码:
/*** 从配置文件中读取指定名字的属性,我们在配置文件中定义exchangeName queueAName queueBName* 那几个属性的值,完后再这里读取。*/
@ConfigurationProperties("my")
public class RabbitConfig {private String exchangeName;private String queueAName;private String queueBName;/*** 三部曲:* 第一步:定义队列*/@Beanpublic DirectExchange directExchange (){return ExchangeBuilder.directExchange(exchangeName).build();}/*** 第二步:定义队列*/@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}/*** 交换机与队列绑定*/@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB1(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("info");}@Beanpublic Binding bindingB2(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("error");}@Beanpublic Binding bindingB3(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("warning");}
}
-
- 第四步: 创建发送消息的类
-
-
- 第一步: 创建步骤:首先注入
RabbitTemplate
- 第二步: 创建一个方法 ,并创建一个消息
- 第三步: 使用
RabbitTemplate
来发送消息,
- 第一步: 创建步骤:首先注入
-
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//创建一条消息.使用建造者模式Message message = MessageBuilder.withBody("hello world".getBytes()).build();//下面就是发送消息,括号里面分别是,参数1 交换机的名字, 参数2 路由key, 参数3 消息rabbitTemplate.convertAndSend("exchange.direct", "info",message);log.info("消息发送完毕");}
11.6 Topic Exchange(主题交换机)
通配符匹配,相当于模糊匹配;
#匹配多个单词,用来表示任意数量(零个或多个)单词
*匹配一个单词(必须有一个,而且只有一个),用 . 隔开的为一个
【例子】:beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
beijing.* == beijing.queue, beijing.xyz
发送时指定的路由键:lazy.orange.rabbit 则与队列Q1 和队列Q2都匹配。
但是虽然与Q2队列有两个都匹配的,但是,也只会进去以条消息。
11.7 例子【主题交换机】
编写步骤:
- 第一步:添加mq的依赖
- 第二步:在配置文件中添加mq的配置
- 第三步:编写一个配置类
-
- 编写配置类的步骤:
-
-
- 第一步:定义交换机
-
@Beanpublic TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(exchangeName).build();}
-
-
- 第二步:定义队列
-
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
-
-
- 交换机与队列绑定
-
@Beanpublic Binding bindingA(TopicExchange topicExchange,Queue queueA){return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");}@Beanpublic Binding bindingB1(TopicExchange topicExchange,Queue queueB){return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");}@Beanpublic Binding bindingB2(TopicExchange topicExchange,Queue queueB){return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");}
- 第四步:编写发送消息
@Service
public class MessageService {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 发送消息*/@Beanpublic void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//有几个参数第一个就是交换机的名字,第二个就是路由key,第三个就是消息amqpTemplate.convertAndSend("exchange.topic","lazy.orange.rabbit",message);}
- 第五步:让启动类实现
ApplicationRunner
并重写里面的方法(方法的作用就是程序一启动就会调用发消息的那个方法)
public class Application implements ApplicationRunner {@Autowiredprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
11.8 Headers Exchange(头部交换机)
基于消息内容中的headers属性进行匹配;
其中 P代表的消息的生产者 C代表消息的消费者 X代表的是交换机 红色代表队列
原理: 就是在发消息的时候,我们在消息的头部在加上匹配的值,看与那个队列匹配。
11.9 例子【头部交换机】
实现步骤:
- 第一步:引入依赖
- 第二步:在配置文件中加入配置
- 第三步:创建一个配置类
-
- 配置类的编写步骤
-
-
- 第一步:定义头部交换机
-
@Beanpublic HeadersExchange headersExchange(){return ExchangeBuilder.headersExchange(exchangeName).build();}
-
-
- 第二步:创建队列
-
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
-
-
- 第三步:交换机与队列绑定,这个需要是头部的信息绑定,所以我们需要创建一个队列,存储每个交换机与队列的绑定。
-
@Beanpublic Binding bindingA(HeadersExchange headersExchange,Queue queueA){Map<String,Object> headerValues = new HashMap<>();headerValues.put("type","m");headerValues.put("status",1);return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();}@Beanpublic Binding bindingB(HeadersExchange headersExchange,Queue queueB){Map<String,Object> headerValues = new HashMap<>();headerValues.put("type","s");headerValues.put("status",0);return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();}
- 第四步:创建发送消息
public class MessageService {@Value("${my.exchangeName}")private String exchangeName;@Autowiredprivate RabbitTemplate rabbitTemplate;public void sandMsg(){//消息属性MessageProperties messageProperties = new MessageProperties();Map<String ,Object> headers = new HashMap<>();headers.put("type","s");headers.put("status",0);//设置消息头messageProperties.setHeaders(headers);//添加消息属性Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();//头部交换机,不需要只当路由keyrabbitTemplate.convertAndSend(exchangeName,"",message);}
}
12 RabbitMQ过期消息
过期消息也叫TTL消息,TTL:Time To Live
消息的过期时间有两种设置方式:(过期消息)
12 .1 设置单条消息的过期时间
就是在MessageProperties()
中有一个setExpiration()
来设置单个消息的过期时间。
设置步骤:
- 前面的引入依赖和配置文件的编写都一样,以及配置类的编写没有任何区别,所以我们这里只编写发送消息的步骤
- 发送消息的步骤:
-
- 第一步:先创建一个
MessageProperties()
,完后再调用setExpiration()
来设置消息的过期时间
- 第一步:先创建一个
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000");//设置过期的毫秒数
-
- 第二步:通过建造者模式中的
MessageBuilder
来创建消息
- 第二步:通过建造者模式中的
Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();
-
- 第三步:通过模板方法
rabbitTemplate.convertAndSend
来发送消息
- 第三步:通过模板方法
rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);
完整的代码编写
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000");//设置过期的毫秒数Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);log.info("消息发送完毕");}
}
12.2 通过队列属性设置消息过期时间
设置步骤:
- 第一步:我们还是编写配置类,并在里面编写三部曲,这里只不过在设置队列的时候,来设置队列的属性
-
- 设置队列属性的步骤:
-
-
- 有两种方式:
-
-
-
-
- 第一种:就是通过new的方式,就是new Queue的时候,里面有5个参数,其中第一个是
队列名称
,第一个就是是否持久化
,还有一个Map集合,其中就是在Map集合中来设置过期时间。 - 第二步:就是创建一个集合,完后使用Map.put()方式,来向集合中放入队列的过期时间,其中队列过期时间的属性
x-message-ttl
- 第一种:就是通过new的方式,就是new Queue的时候,里面有5个参数,其中第一个是
-
-
@Beanpublic Queue queue(){//方式1,new的方式Map<String ,Object> arguments =new HashMap<>();arguments.put("x-message-ttl",15000);return new Queue(queueName,true,false,false,arguments);}
-
-
- 第二种就是:建造者模式: 这种模式,还是要提前创建一个Map集合,完后通过建造者模式,将集合传入进去。
-
-
- 完整的代码
//第二种:建造者模式
@Beanpublic Queue queue(){//方式1,new的方式Map<String ,Object> arguments =new HashMap<>();arguments.put("x-message-ttl",15000); return QueueBuilder.durable(queueName).withArguments(arguments).build();
- 第二步:发送消息的步骤:
-
- 因为这个设置了队列的过期消息,所以,这里只需要,创建消息,完后再使用rabbit模板发送消息
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.ttl.b","info",message);log.info("消息发送完毕");}
}
【注意】:如果单个消息和队列都设置了过期时间,那么消息的过期时间以二者之间较小的那个数值为准。
13 RabbitMQ的死信队列
13.1如何消费者的手动确认
-
- 第一步:在消费者端的配置文件中加入配置
listener:simple:acknowledge-mode: manual
-
- 第二步:在接收消息的一段那段配置来通知其删除
-
-
- 实现过程:
-
-
-
-
- 第一步:先编写一个方法,参数是Message Channel 俩个参数
- 第二步:获取消息的属性
-
-
//第一步:获取消息的属性MessageProperties messageProperties = message.getMessageProperties();
-
-
-
- 第三步:根据消息的属性来获取消息的唯一标识(如同身份证号)
-
-
//获取消息的唯一标识(如同身份证号)long tag = messageProperties.getDeliveryTag();
-
-
-
- 第四步:编写一个try ....catch 代码块,try { }里面编写的正常接收到消息,正常接收消息后,则告诉消费者就可以删除了
channel.basicAck(tag,false);
- 第四步:编写一个try ....catch 代码块,try { }里面编写的正常接收到消息,正常接收消息后,则告诉消费者就可以删除了
-
-
try {byte[] body = message.getBody();String str = new String(body);log.info("接收到的消息为"+ str);//正常接收后,告诉服务器可以删除了。//消费者的手动确认 tag代表确认的就是这条消息//false代表只确认当前一条channel.basicAck(tag,false);
-
-
-
- 第五步:就是在catch里面编写的,说明报错了,告诉服务器不要删除,我们需要重新接收一遍
channel.basicNack(tag,false,true);
- 第五步:就是在catch里面编写的,说明报错了,告诉服务器不要删除,我们需要重新接收一遍
-
-
}catch (Exception e){//报错了,我们没有接收到消息,告诉服务器不要删除,我们再重新接收一遍log.error("接收者出现问题");//告诉服务器不要删除 ,第一个参数:当前消息的标识//第二个参数,只代表当前消息//第三个参数:重新放回到队列里面try {channel.basicNack(tag,false,true);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}
- 完整的代码编写
package com.powernode.message;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class MessageReceive {@RabbitListener(queues = {"queue.normal.4"})public void receviceMsg(Message message , Channel channel){//第一步:获取消息的属性MessageProperties messageProperties = message.getMessageProperties();//获取消息的唯一标识(如同身份证号)long tag = messageProperties.getDeliveryTag();try {byte[] body = message.getBody();String str = new String(body);log.info("接收到的消息为"+ str);//正常接收后,告诉服务器可以删除了。//消费者的手动确认 tag代表确认的就是这条消息//false代表只确认当前一条channel.basicAck(tag,false);}catch (Exception e){//报错了,我们没有接收到消息,告诉服务器不要删除,我们再重新接收一遍log.error("接收者出现问题");//告诉服务器不要删除 ,第一个参数:当前消息的标识//第二个参数,只代表当前消息//第三个参数:重新放回到队列里面try {channel.basicNack(tag,false,true);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}}
}
13.2 什么条件下会变成死信队列?
-
- 1、消息过期
-
-
- 是通过定义消息属性来设置消息的过期时间,在正常队列哪里设置死信交换机
-
public void sendMsg(){//定义一个消息属性MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000");//定义消息Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();//发消息的几个参数rabbitTemplate.convertAndSend("exchange.normal.2","order",message);
-
- 2、队列过期
-
-
- 是在定义正常队列哪里设置队列过期,以及设置死信交换机
-
@Beanpublic Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();
// //设置消息过期时间arguments.put("x-message-ttl" ,20000);//设置死信交换机,一旦消息过了20秒,而且还没有消费者,则就会进入这个死信交换机。arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,这个key就是死信交换机和死信队列绑定的呢个keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
-
- 3、队列达到最大长度,当达到队列设置的最大长度时,如果此时还有消息进入队列,那么最开始进入队列的消息就会变成死信。
public Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();//设置队列的最大长度arguments.put("x-max-length",5);//设置死信交换机,一旦消息过了20秒,而且还没有消费者,则就会进入这个死信交换机。arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,这个key就是死信交换机和死信队列绑定的呢个keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
-
- 4、消费者从正常的队列中接收消息,但是对消息不进行确认,并且不对消息进行重新投递(不重新入队),此时消息就进入死信队列。(就是当我们开启了手动确认以后,在发生异常之后,消费者手动不确认,并且不重新入队)
channel.basicNack(tag,false,flase);
- 4、消费者从正常的队列中接收消息,但是对消息不进行确认,并且不对消息进行重新投递(不重新入队),此时消息就进入死信队列。(就是当我们开启了手动确认以后,在发生异常之后,消费者手动不确认,并且不重新入队)
catch (Exception e){//报错了,我们没有接收到消息,告诉服务器不要删除,我们再重新接收一遍log.error("接收者出现问题");//告诉服务器不要删除 ,//第一个参数:当前消息的标识//第二个参数,只代表当前消息//第三个参数:是否重新放回到队列里面try {channel.basicNack(tag,false,flase);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);
-
- 5、消费者拒绝消息
开启手动确认模式,并拒绝消息,不重新投递(不重新入队),则进入死信队列
channel.basicReject(tag,false);
}catch (Exception e){log.error("接收者出现问题");try {//消费者拒绝消息,// 第一个参数:当前消息的唯一标识//第二个参数:是否重新入队;//basicReject与basicNack的区别:basicReject只能处理一条消息channel.basicReject(tag,false);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}}
13.3 死信队列的详细解释
其中死信队列也叫做死信交换机、死信邮箱等说法。
DLX:Dead - Letter - Exchange 死信交换器、死信邮箱
过程:
第一步:生产者生产消息 ----> 消息发送到交换机 ----> 交换机根据路由key发送到队列----->当没有死信交换机之前 在规定的时间内消息过期无人接收,则消息丢了 -----> 创建一个死信交换机 -----> 当创建一个死信交换机之后,在规定的时间内无人接收的消息则进入死信交换机 。---->死信交换机根据路由key路由到另一条队列(死信队列)---->消费者也可以接收死信队列的信息。
代码的实现过程
- 第一步:引入依赖
- 第二步:在配置文件中配置
- 第三步:创建一个Rabbit的配置类(就是创建交换机等)
-
- 配置类中的第一步:就是引入配置类中的交换机和队列的名称
- 第二步:创建正常的交换机
@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}
-
- 第三步:创建正常的队列,并在正常的队列中,设置消息的过期时间,设置死信交换机,以及设置死信路由key (就是死信交换机与死信队列绑定的那个key)
@Beanpublic Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();//设置消息过期时间arguments.put("x-message-ttl" ,20000);//设置死信交换机,一旦消息过了20秒,而且还没有消费者,则就会进入这个死信交换机。arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置死信路由key,这个key就是死信交换机和死信队列绑定的呢个keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
-
- 第四步:正常的交换机与正常的队列绑定
public Binding bindingNormal(DirectExchange normalExchange ,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}
-
- 第五步:声明死信交换机(就和正常的交换机一样)
@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}
-
- 第六步:声明死信队列
@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}
-
- 第七步:死信队列与死信交换机绑定
@Beanpublic Binding bindingDlx(DirectExchange dlxExchange ,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}
-
- 第八步:创建消息,并发送消息
@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//定义消息Message message = MessageBuilder.withBody("hello world".getBytes()).build();//发消息的几个参数rabbitTemplate.convertAndSend("exchange.normal.a","order",message);log.info("消息发送完毕");}
14 RabbitMQ的延迟队列
使用场景就是关闭规定时间内未支付的订单
场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;
14.1 定时任务方式
- 每隔3秒扫描一次数据库,查询过期的订单然后进行处理;
优点:
简单,容易实现;
缺点:
1、存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
2、性能较差,每次扫描数据库,如果订单量很大
14.2、 被动取消
当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭)
优点:
对服务器而言,压力小;
缺点:
1、用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
2、用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
14.3、JDK延迟队列(单体应用,不能分布式下)
DelayedQueue
无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素
优点:
实现简单,任务延迟低;
缺点:
服务重启、宕机,数据丢失;
只适合单机版,不适合集群;
订单量大,可能内存不足而发生异常; oom
14.4、 采用消息中间件(rabbitmq)
1、RabbitMQ本身不支持延迟队列,可以使用TTL(消息过期)结合DLX(死信队列)的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。
上述 :order.ttl.queue代表的是正常队列
order.dlx.queue 代表的是死信队列
其中交换机只有一个,当生产者根据key找到交换机以后,交换机根据key找到正常队列,过期以后,正常队列指定的死信交换机还是那个交换机(即那条消息就会返回给那个交换机),完后交换机根据死信路由key,发送到死信队列中
存在问题 :如何解决消息过期时间不一致的问题?
如果先发送的消息,消息延迟时间长,会影响后面的 延迟时间段的消息的消费;
就是因为队列是先进先出,因为第一条消息如果设置的时间大于第二个消息设置的时间,那么只有等第一个消息过期以后,才能轮到第二个消息,那么这样的话,第二个消息就和设置的过期时间不一致。
解决方式:
不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样(即相同的过期时间放到相同的队列里面)。
代码实现
- 第一步:引入依赖
- 第二步:引入配置文件的信息
- 第三步:编写交换机以及队列,并绑定,在队列哪里指定死信交换机的名称。
- 第四步:编写发送消息的类,并创建几条消息,并指定消息的过期时间。分别发送到不同时间的交换机上。
- 第五步:接收消息(接收的死信队列的消息)。
14 .5 、 使用rabbitmq-delayed-message-exchange 延迟插件
- 第一步:选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,下载地址:http://www.rabbitmq.com/community-plugins.html
- 第二步:下载完成以后,解压这个插件
unzip rabbitmq_delayed_message_exchange-3.10.2.ez
如果unzip 没有安装,先安装一下
yum install unzip -y
- 第三步:先找sbin 目录, 找到这个 plugins目录,启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
开启插件;
原理:
安装这个插件以后,就会有一个新的交换机: 延迟交换机类似于直连交换机
消息发送后不会直接投递到队列,而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);
延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现
解决了消息过期时间不一致出现的问题。
代码实现过程:
- 第一步:引入mq依赖
- 第二步:在配置文件中引入配置信息
- 第三步:创建一个配置类,来创建延迟交换机和队列,以及绑定(通过自定义交换机来创建)
@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueDelayName}")private String queueDelayName;/*** 创建交换机* 使用自定交换机来创建延迟交换机* @return*/@Beanpublic CustomExchange customExchange(){//自定义交换机Map<String, Object> arguments =new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);}/*** 创建队列*/@Beanpublic Queue queue(){return QueueBuilder.durable(queueDelayName).build();}/*** 交换机与队列绑定*/@Beanpublic Binding binding(CustomExchange customExchange ,Queue queue){return BindingBuilder.bind(queue).to(customExchange).with("plugin").noargs();}
- 第四步 :发送消息
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息*/@Beanpublic void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 25000); //第一条消息,设置延迟时间Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);log.info("发送消息完毕");}{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 15000); //第二天消息,设置延迟时间Message message = MessageBuilder.withBody("hello world111".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);log.info("发送消息完毕");}}
- 第五步 :接收消息
@Component
@Slf4j
public class ReceiveMessage {/*** 接收延迟队列的消息,使用的是插件* @param message*/@RabbitListener(queues = {"queue.delay.4"})public void receiveMsg(Message message){String body = new String(message.getBody());log.info("接收到消息的为: "+body);}
15 消息的可靠性投递
消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;
如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。
① 代表消息从生产者发送到Exchange; ----使用RabbitMQ消息Confirm模式(生产者确认回调)
来解决
② 代表消息从Exchange路由到Queue---使用RabbitMQ消息Return模式 备用交换机模式
来解决
③ 代表消息在Queue中存储;------- 通过设置交换机的持久化,以及队列的持久化、每条消息的持久化
④ 代表消费者监听Queue并消费消息;----自动确认改成手动确认
15 .1 RabbitMQ消息Confirm模式(生产者确认回调)----可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中;
消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应
答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;
代码的具体实现过程(如何实现生产者确认回调)
- 第一步: 引入依赖
- 第二步:在配置文件中配置信息
publisher-confirm-type: correlated
开启确认模式
rabbitmq:host: 192.168.190.132port: 5672username: adminpassword: 123456virtual-host: powernode #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shangpublisher-confirm-type: correlated #开启生产者的确认模式,设置成关联模式
- 第三步:编写一个类实现
RabbitTemplate.ConfirmCallback
类,并实现里面的方法,在方法里面有三个参数,第一个参数是关联数据
第二个参数是判断消息是否到达交换机
,第三个是原因
,这个就是不管消息有没有到达交换机,都会回调这个类,给与我们提示。【这个类也可以直接在发消息的时候直接实现】
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {/**** @param correlationData correlation data for the callback. 关联数据* @param ack true for ack, false for nack 真 到达交换机 或者假 没有到达交换机* @param cause An optional cause, for nack, when available, otherwise null. 原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(correlationData.getId()); //获得idif (ack){//说明消息到达交换机log.info("消息正确到达交换机");return;}//说明消息没有到达交换机log.error("消息没有到达交换机,原因为{}" ,cause);}
}
@Service
@Slf4j
public class MessageService implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;//这个rabbitTemplate 还没有使用我们那个回调接口@PostConstruct //构造方法后执行,如同初始化public void init(){rabbitTemplate.setConfirmCallback(this);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//定义一个关联数据CorrelationData correlationData =new CorrelationData();correlationData.setId("order_123456"); //发送订单信息rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(correlationData.getId()); //获得idif (ack){//说明消息到达交换机log.info("消息正确到达交换机");return;}//说明消息没有到达交换机log.error("消息没有到达交换机,原因为{}" ,cause);}
}
- 第四步:在我们发消息的那个类中定义这个关联数据,并调用,完后在我们发送消息到交换机以后,不管有没有到达交换机,都会给我们一个提示
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyConfirmCallBack myConfirmCallBack;//这个rabbitTemplate 还没有使用我们那个回调接口@PostConstruct //构造方法后执行,如同初始化public void init(){rabbitTemplate.setConfirmCallback(myConfirmCallBack);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//定义一个关联数据CorrelationData correlationData =new CorrelationData();correlationData.setId("order_123456"); //发送订单信息rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);}
}
15.2 RabbitMQ消息Return模式(保证交换机的消息到达队列)-----可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。
rabbitmq 整个消息投递的路径为:
producer —> exchange —> queue —> consumer
>> 消息从 producer 到 exchange 则会返回一个 confirmCallback
>> 消息从 exchange –> queue 投递失败则会返回一个 returnCallback;
我们可以利用这两个callback控制消息的可靠性投递
代码的实现过程
- 第一步:引入依赖
- 第二步:在配置文件配置相关信息,开启return模式
rabbitmq:host: 192.168.190.132port: 5672username: adminpassword: 123456virtual-host: powernode #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shangpublisher-returns: true #开启return模式
- 第三步:编写一个类来实现
RabbitTemplate.ReturnsCallback
并实现里面的方法,(如果这个类被调用,说明没有正确的路由到队列)
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returned) {//如果这个方法被调用,说明消息没有到达队列log.error("消息从交换机没有正确的路由到队列,原因为{}",returned.getMessage());}
}
- 第四步:在发送消息的类中,来设置,使
rabbitTemplate
调用我们编写的那个类
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyReturnCallBack myReturnCallBack;//这个rabbitTemplate 还没有使用我们那个回调接口@PostConstruct //构造方法后执行,如同初始化public void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack);}
- 第五步 : 编写发送消息
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyReturnCallBack myReturnCallBack;//这个rabbitTemplate 还没有使用我们那个回调接口@PostConstruct //构造方法后执行,如同初始化public void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);}
}
15.3 确保消息在队列正确地存储
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;
(1)、队列持久化
QueueBuilder.durable(QUEUE).build();
(2)、交换机持久化
ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
(3)、消息持久化
MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
15.4 确保消息从队列正确地投递到消费者
如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);
实现步骤:
- #开启手动ack消息消费确认 ,在配置文件中配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- #编写配置类(配置交换机、队列、以及绑定)
- 编写一个发送消息的类
- 编写一个接收消息的类 ,开启手动消息确认后,我们接收完消息后,并不会删除
@Component
@Slf4j
public class ReceiveMessage {/*** 接收延迟队列的消息,使用的是插件* @param message*/@RabbitListener(queues = {"queue.delay.4"})public void receiveMsg(Message message , Channel channel){//获取消息的唯一标识long tag = message.getMessageProperties().getDeliveryTag();String body = null;try {body = new String(message.getBody());log.info("接收到消息的为: "+body);//TODO 正确的话,我们会有插入数据库等//正确的话,就是告诉服务器,可以删除了//第一个参数是,当前消息,// 第二个参数:是否批量处理channel.basicAck(tag,false);} catch (Exception e) {log.error("消息处理出现问题");try {//三个参数分别是:当前消息//是否是批量处理//是否重新入队channel.basicNack(tag,false,true);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}
16 交换机的属性
- 具体参数(以下属性是在创建交换机的时候设置的)
-
- Name:交换机名称;就是一个字符串
- Type:交换机类型,direct, topic, fanout, headers四种
- Durability:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在;默认是持久化的, yes为持久化 ,flase为不持久化(没有保存到磁盘上).
- Auto delete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机;
- Internal:内部使用的,如果是yes,客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定。
- Arguments:只有一个取值alternate-exchange,表示备用交换机;当队列与交换机因为路由无法发送消息,则会发送到备用交换机上.
- 例子
@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).durable(true).autoDelete() //设置自动删除,默认不是自动删除的,boolean类型的默认为flase.build();}
16.1 备用交换机
正常的情况下,备用队列是不应该有消息的,当备用队列有消息时,说明出错了.提示我们及时更改代码.
主交换机是direct(直连的) 备用交换机是fanout(扇形的)
描述:就是我们生产者与交换机的key是hello ,完后交换机与队列的key是info,完后就不能进入队列里面,则就会进入备用交换机里面,从而进入备用队列。
当路由没有写错,则就会进入指定的交换机,不会进入备用交换机。
- 备用交换机的设置:
- 第一种:使用建造者模式的时候
.alternate()
public DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).alternate() //设置备用交换机.build();
17 队列的详细属性
队列属性的设置都是在创建队列的时候,来设置队列的属性。
- Type:队列类型
- Name:队列名称,就是一个字符串,随便一个字符串就可以;
- Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;
- Auto delete: 是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;
- Exclusive:exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;基本上不设置它,设置成false
- Arguments:队列的其他属性,例如指定DLX(死信交换机等)
-
- x-expires:Number
当Queue(队列)在指定的时间未被访问,则队列将被自动删除;
-
- x-message-ttl:Number
发布的消息在队列中存在多长时间后被取消(单位毫秒);
-
- x-overflow:String
设置队列溢出行为,当达到队列的最大长度时,消息会发生什么,
有效值为Drop Head 删除头部 默认的
Reject Publish 拒绝发布 ,一旦队列满了以后,就不再接收新的消息
-
- x-max-length:Number
队列所能容下消息的最大长度,当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法;
-
- x-single-active-consumer:默认为false
激活单一的消费者,也就是该队列只能有一个消息者消费消息;
-
- x-max-length-bytes:Number
限定一定的字节装入队列,当超出后就不在装入队列;
-
- x-dead-letter-exchange:String
指定队列关联的死信交换机,有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;
-
- x-dead-letter-routing-key:String
指定死信交换机的路由键,一般和6一起定义;
-
- x-max-priority:Number
如果将一个队列加上优先级参数,那么该队列为优先级队列; 数字越大优先级越高,默认优先级为0
(1)、给队列加上优先级参数使其成为优先级队列
x-max-priority=10【0-255取值范围】
(2)、给消息加上优先级属性
通过优先级特性,将一个队列实现插队消费;
实现方式:在创建消息的时候,通过MessageProperties
中的setPriority()来设置优先级。
public void sendMsg(){MessageProperties messageProperties =new MessageProperties();messageProperties.setPriority(6);Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);}
18 消息的幂等性
消息消费时的幂等性(消息不被重复消费)
同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;
幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;
其中select 、update、delete 都是幂等的。insert是非幂等的。
在http中:get、put、delete都是幂等的。post是非幂等的。
18.1 如何避免消息的重复消费问题?
全局唯一ID + Redis
生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,将其放入redis中,并设置这个key,完后再对设置的结果进行判断,如果为true,说明这条消息不存在就可以存入数据库了 ,当一条相同的id的消息再次过来,则返回结果为flase,则不处理。
private ObjectMapper objectMapper
用于序列化和反序列化的 (json格式的)
objectMapper.writeValueAsString(传入的对象)
将对象转为字符串
objectMapper.readValue(消息,要转成那个对象)
将字符串或字节数组,转为对象
序列化----就是将对象转为字符串,或者字节数组
反序列化---就是将字符串或者字节数组转为对象
具体的代码实现过程:
消费者拿到消息以后,放入redis中,并设置这个key,完后再对设置的结果进行判断,如果为true,说明设置成功了(下一步就可以插入数据库了)
//放入redis中,并设置id
Boolean setRuslt = RedisTemplate.opsForValue().setIfAbsent(要设置的id)
//如果id不存在,说明没有这条消息,就可以放入数据库了
if(setRuslt){//就可以放入数据库了
}
//下一条消息再过来,就返回的flase,完后就不处理了。
19 RabbitMQ集群与高可用
RabbitMQ 的集群分两种模式,一种是默认集群模式,一种是镜像集群模式;
在RabbitMQ集群中所有的节点(一个节点就是一个RabbitMQ的broker服务器)被归为两类:一类是磁盘节点,一类是内存节点;
磁盘节点会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中,而内存节点只会将这些信息保存到内存中,如果该节点宕机或重启,内存节点的数据会全部丢失,而磁盘节点的数据不会丢失;
19 .1 默认集群模式
默认集群模式也叫 普通集群模式、或者 内置集群模式;
RabbitMQ默认集群模式,只会把交换机、队列、虚拟主机等元数据信息在各个节点同步,而具体队列中的消息内容不会在各个节点中同步;
接发消息的原理:
创建队列的时候,只要在任何一个节点创建,都会复制到其他的两个节点上。
接收消息的时候:
但是发消息的时候只会发送到节点1上,接消息的时候也是从节点1接收消息
发送消息的时候:
假如发消息的时候,我们连接到节点2,节点2上本身是不存储消息的,它会把请求转到节点1上,从而发送到节点1上。节点1停了,还是不能发消息的
假如接消息的时候是从节点3上接收,节点3上没有消息,它会把请求转到节点1上,从而接收到消息
元数据(除了消息本身其他的都是元数据)
队列元数据:队列名称和属性(是否可持久化,是否自动删除)
交换器元数据:交换器名称、类型和属性
绑定元数据:交换器和队列的绑定列表
(虚拟主机)vhost元数据:vhost内的相关属性,如安全属性等;
默认集群模式式
优点:
1)节省存储空间;
2)性能提高;
如果消息需要复制到集群中每个节点,网络开销不可避免,持久化消息还需要写磁盘,占用磁盘空间。
缺点:
当其中一个队列宕机以后,则内容就会全部消失。
19.2 集群的搭建
- 第一步:从已经安装好rabbitmq的机器 clone 三台机器,【注意clone完,先不要启动三台机器,三台机器均要重新生成mac地址,防止clone出的机器ip地址重复】
-
- 虚拟机的克隆步骤:
-
-
- 将虚拟机关机
- 点击管理---->点击克隆---->选中虚拟机当前的状态--->选中创建完整的虚拟机-->填写克隆的虚拟机的名称---> 点击完成--->在启动前改变其物理ip--->启动前点击网络适配器---->高级---->召见MAC地址--->点击几遍生成 ------->克隆完成
-
- 第二步:使用xshell 连接三台机器
- 第三步:修改三台机器的/etc/hosts 文件,
vim /etc/hosts
192.168.131.128 rabbit130
192.168.131.129 rabbit133
192.168.131.130 rabbit134
- 第三步:三台机器均重启网络,使节点名生效(可以直接关机重新启动)
systemctl restart network
- 第四步:三台机器的防火墙处理
systemctl status firewalld ---查看防火墙的状态
systemctl stop firewalld --关闭防火墙
systemctl disable firewalld --开机不启动防火墙
- 第五步:三台机器 .erlang.cookie文件保持一致 ,由于是clone出的三台机器,所以肯定是一样的
如果我们使用解压缩方式安装的RabbitMQ,那么该文件会在${用户名}目录下,
也就是${用户名}/.erlang.cookie;
如果我们使用rpm安装包方式进行安装,那么这个文件会在/var/lib/rabbitmq目录下;
查看隐藏文件用的ls-a
注意 .erlang.cookie的权限为400,目前已经是400
- 第六步: 分别启动
rabbitmq-server -detached
- 第七步: 使用以下命令查看集群状态
Disk Nodes 表示磁盘节点
rabbitmqctl cluster_status
- 第八步:以上的三个机器还是互不相干三个机器,并不是集群,构建集群
-
- 1、先把其他的两个rabbitmq停掉
rabbitmqctl stop_app 仅仅是停掉rabbitmq
-
- 2、把当前rabbit中的交换机和队列全部重置
rabbitmqctl reset
-
- 3、将当前rabbit加入到第一个rabbit中,并成为内存节点
--ram 参数表示让rabbitmq128成为一个内存节点,如果不带参数默认为disk磁盘节点;
rabbit@rabbit128 代表的主rabbitmq
rabbitmqctl join_cluster rabbit@rabbit128 --ram rabbit@rabbit128主rabbitmq
-
- 4、启动rabbitmq
rabbitmqctl start_app
-
- 5、查看主rabbit的状态
rabbitmqctl cluster_status
19.3 操作集群中的一个节点,添加用户和权限等
#列出用户
rabbitmqctl list_users
# 添加用户
rabbitmqctl add_user admin 123456
#查看权限
rabbitmqctl list_permissions
#设置权限
rabbitmqctl set_permissions admin ".*" ".*" ".*"
#设置角色
rabbitmqctl set_user_tags admin administrator【注意】:这个启动插件需要给集群中的每一个rabbit都单独启动
#列出插件
rabbitmq-plugins list
#启动web控制台插件
rabbitmq-plugins enable rabbitmq_management
使用web浏览器添加一个虚拟主机:powernode
实现原理
RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证;
19.4 springboot连接集群
只需要改变配置文件即可 ,其他接发消息是一样的。
spring:#配置rabbitmqrabbitmq:# 连接集群,使用逗号分隔addresses: 192.168.150.150:5672,192.168.150.151:5672,192.168.150.152:5672username: adminpassword: 123456virtual-host: powernode
20 镜像集群模式
镜像模式是基于默认集群模式加上一定的配置得来的;
在默认模式下的RabbitMQ集群,它会把所有节点的交换机、绑定、队列的元数据进行复制确保所有节点都有一份相同的元数据信息,但是队列数据分为两种:一种是队列的元数据信息(比如队列的最大容量,队列的名称等配置信息),另一种是队列里面的消息;
镜像模式,则是把所有的队列数据完全同步,包括元数据信息和消息数据信息,当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模式;
20.1镜像模式的搭建
实现镜像模式也非常简单,它是在普通集群模式基础之上搭建而成的;
镜像队列配置命令:
./rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
- -p Vhost(虚拟主机): 可选参数,针对指定vhost下的queue进行设置;
- Name: policy的名称;(可以自己取个名字就可以)
- Pattern: queue的匹配模式(正则表达式); ----就是说想对哪些主机进行镜像
- Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode;---ha代表高可用
- 【例子】:
{“ha-mode”:”exactly”,”ha-params”:2}
-
- ha-mode:指明镜像队列的模式,有效值为
all/exactly/nodes
- ha-mode:指明镜像队列的模式,有效值为
-
-
- all:表示在集群中所有的节点上进行镜像
- exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
- nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指
-
-
- ha-params:ha-mode模式需要用到的参数
- ha-sync-mode:进行队列中消息的同步方式,有效值为automatic(自动)和manual(手动)
- priority:可选参数,policy的优先级;
- 【例子】:"^policy_表示以policy开头的队列
./rabbitmqctl set_policy -p powernode ha_policy "^policy_"
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
- 【例子2】:如果要在所有节点所有队列上进行镜像,则(在任意节点执行如下命令):
所有节点、所有虚拟主机、所有队列 都进行镜像
./rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
- 【例子3】:针对某个虚拟主机进行镜像
rabbitmqctl set_policy -p powernode ha-all "^"
'{"ha-mode":"all","ha-sync-mode":"automatic"}'
相关文章:
RabbitMQ消息队列的笔记
Rabbit与Java相结合 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> 在配置文件中编写关于rabbitmq的配置 rabbitmq:host: 192.168.190.132 /…...
linux不同发行版中的主要差异
一、初始化系统 Linux不同发行版中的系统初始化系统(如 System V init、Upstart 或 systemd) System V init: 历史:System V init 是最传统的 Linux 系统初始化系统,起源于 Unix System V 操作系统。运行级别ÿ…...
Elasticsearch+Kibana分布式存储引擎
1.ElaticSearch介绍 ElaticSearch ,简称为 ES , ES 是一个开源的高扩展的分布式全文检索引擎,它可以近乎实时的存储、检 索数据;本身扩展性很好,可以扩展到上百台服务器,处理 PB 级别的数据。 ES 也使用 …...
spark 分布式 原理
Apache Spark 是一个快速且通用的大数据处理引擎,它支持分布式计算。Spark 的设计旨在通过高效的内存内计算和对多种数据源的支持来简化大规模数据集的处理。以下是关于 Spark 分布式原理的详细介绍: 1. 架构概述 Driver Program(驱动程序&…...
Hadoop学习笔记(包括hadoop3.4.0集群安装)(黑马)
Hadoop学习笔记 0-前置章节-环境准备 0.1 环境介绍 配置环境:hadoop-3.4.0,jdk-8u171-linux-x64 0.2 VMware准备Linux虚拟机 0.2.1主机名、IP、SSH免密登录 1.配置固定IP地址(root权限) 开启master,修改主机名为…...
thinkphp:try-catch捕获异常
使用简单的例子,实现了一个简单的try-catch捕获异常的实例 //开始事务Db::startTrans(); try{ //有异常抛出异常 if(存在错误){ throw new \Exception("异常信息"); } // 提交事务 Db::commit(); // 返回成功信息 ... } catch (\…...
如何使用 uni-app 构建直播应用程序?
使用uni-app构建直播应用程序涉及前端和后端的开发,以及音视频处理技术的选择。下面我将概述一个典型的直播应用架构,并详细说明如何在uni-app中实现关键功能。 直播应用架构 前端(uni-app):负责用户界面展示、互动逻…...
正则表达式入门教程
正则表达式入门教程 1. 引言 正则表达式(Regular Expression,简称Regex)是一种用于处理字符串的强大工具,它允许用户通过特定的模式(pattern)来搜索、匹配、查找和替换文本中的数据。正则表达式在文本处理、数据验证、数据提取等领域有着广泛的应用。本教程将带你了解正…...
uniapp 常用的指令语句
uniapp 是一个使用 Vue.js 开发的跨平台应用框架,因此,它继承了 Vue.js 的大部分指令。以下是一些在 uniapp 中常用的 Vue 指令语句及其用途: v-if / v-else-if / v-else 条件渲染。v-if 有条件地渲染元素,v-else-if 和 v-else 用…...
【Figma_01】Figma软件初始与使用
Figma初识与学习准备 背景介绍软件使用1.1 切换主题1.2 官方社区 设计界面2.1 创建一个项目2.2 修改文件名2.3 四种模式2.4 新增界面2.5 图层2.6 工具栏2.7 属性栏section透明度和圆角改变多边形的边数渐变效果描边设置阴影等特效拖拽相同的图形 背景介绍 Ul设计:User Interfa…...
AI工具如何深刻改变我们的工作与生活
在当今这个科技日新月异的时代,人工智能(AI)已经从科幻小说中的概念变成了我们日常生活中不可或缺的一部分。从智能家居到自动驾驶汽车,从医疗诊断到金融服务,AI正以惊人的速度重塑着我们的世界。 一、工作方式的革新…...
信息安全实训室网络攻防靶场实战核心平台解决方案
一、引言 网络安全靶场,作为一种融合了虚拟与现实环境的综合性平台,专为基础设施、应用程序及物理系统等目标设计,旨在向系统用户提供全方位的安全服务,涵盖教学、研究、训练及测试等多个维度。随着网络空间对抗态势的日益复杂化…...
平方根无迹卡尔曼滤波(SR-UKF)的MATLAB例程,使用三维非线性的系统
本MATLAB 代码实现了平方根无迹卡尔曼滤波(SR-UKF)算法,用于处理三维非线性状态估计问题 文章目录 运行结果代码概述代码 运行结果 三轴状态曲线对比: 三轴误差曲线对比: 误差统计特性输出(命令行截图&…...
【从零开始入门unity游戏开发之——C#篇04】栈(Stack)和堆(Heap),值类型和引用类型,以及特殊的引用类型string,垃圾回收( GC)
文章目录 知识回顾一、栈(Stack)和堆(Heap)1、什么是栈和堆2、为什么要分栈和堆3、栈和堆的区别栈堆 4、总结 二、值类型和引用类型1、那么值类型和引用类型到底有什么区别呢?值类型引用类型 2、总结 三、特殊的引用类…...
人员离岗监测摄像机智能人员睡岗、逃岗监测 Python 语言结合 OpenCV
在安全生产领域,人员的在岗状态直接关系到生产流程的顺利进行和工作环境的安全稳定。人员离岗监测摄像机的出现,为智能人员睡岗、逃岗监测提供了高效精准的解决方案,而其中的核心技术如AI识别睡岗脱岗以及相关的算法盒子和常见的安全生产AI算…...
Linux-ubuntu点LED灯C语言版
一,C语言点灯 1.寄存器配置 设置为SVC模式,复用寄存器设置GPIO1-IO003,设置电气属性,设置为输出模式。 2.软件 汇编语言对模式设置,并且将堆栈指针指向主程序: .global _start_start: /*设置为svr模式 */mrs …...
第P3周:Pytorch实现天气识别
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 目标 读取天气图片,按文件夹分类搭建CNN网络,保存网络模型并加载模型使用保存的模型预测真实天气 具体实现 (一…...
代理IP与生成式AI:携手共创未来
目录 代理IP:网络世界的“隐形斗篷” 1. 隐藏真实IP,保护隐私 2. 突破网络限制,访问更多资源 生成式AI:创意与效率的“超级大脑” 1. 提高创作效率 2. 个性化定制 代理IP与生成式AI的协同作用 1. 网络安全 2. 内容创作与…...
函数式编程
Lambda表达式 1、什么时候可以使用Lambda表达式呢? 一般都是在简化匿名内部类,当这个函数是一个接口,并且接口中只要一个方法时,就可以使用Lambda表达式 2、格式 (参数列表)->{方法体} 其中形参也不需要传,只需要传实参 只关注参数列表和方法体不关注方法啥的东西…...
抖音SEO短视频矩阵源码系统开发分享
在数字营销的前沿阵地,抖音短视频平台凭借其独特的魅力和庞大的用户基础,已成为社交媒体领域一股不可小觑的力量。随着平台影响力的持续扩大,如何有效提升视频内容的可见度与流量成为了内容创作者关注的焦点。在此背景下,一套专为…...
常见的锁与线程安全
目录 一、STL,智能指针和线程安全 STL中的容器是否是线程安全的? 智能指针是否是线程安全的? 二、其他常见的各种锁 三、自旋锁 四、读者写者问题 读写锁接口 读者优先伪代码 一、STL,智能指针和线程安全 STL中的容器是否是线程安全的? 不是 . 原因是 , STL 的设…...
java中的List、数组和set
在Java中,List、数组(Array)和Set 是三种常用的数据结构,它们各自有不同的特性、用途和实现方式。下面我们将深入探讨这三者的特点、区别以及它们在 Java 中的常见使用场景。 1. 数组(Array) 特性&#x…...
NLP-Huggingface基本使用方法
NLP的网络结构大同小异,只不过训练策略可能会不同。因为与图像cv不同,文本训练数据非常的多,cv可以使用10几张就可以获得特征向量,而文本做不到学几句话就能让计算机听得懂话。因此,我们都需要使用预训练模型ÿ…...
Liquibase结合SpringBoot使用实现数据库管理
Liquibase概述 Liquibase 是一个开源的数据库变更管理工具,用于跟踪、版本化、和管理数据库结构(如表、字段、索引等)的变更。它的目的是使数据库变更的过程更加透明、可控制、自动化,避免开发团队在多个环境中手动执行相同的数据…...
高防CDN 如何防止DDoS和CC攻击?
在数字化时代,网络安全威胁日益严重,尤其是DDoS(分布式拒绝服务)攻击和CC(Challenge Collapsar)攻击,已成为企业网站和网络服务最常见且最具破坏力的攻击手段。为了有效抵御这些攻击,…...
15.初始接口1.0 C#
这是一个用于实验接口的代码 适合初认识接口的人 【CSDN开头介绍】(文心一言AI生成) 在C#编程世界中,接口(Interface)扮演着至关重要的角色,它定义了一组方法,但不提供这些方法的实现。接口作为…...
数据结构day5:单向循环链表 代码作业
一、loopLink.h #ifndef __LOOPLINK_H__ #define __LOOPLINK_H__#include <stdio.h> #include <stdlib.h>typedef int DataType;typedef struct node {union{int len;DataType data;};struct node* next; }loopLink, *loopLinkPtr;//创建 loopLinkPtr create();//…...
利用CNN与多尺度特征、注意力机制的融合实现低分辨率人脸表情识别,并给出模型介绍与代码实现
大家好,我是微学AI,今天给大家介绍一下利用CNN与多尺度特征、注意力机制的融合实现低分辨率人脸表情识别,并给出模型介绍与代码实现。在当今社会,人脸识别技术已广泛应用,但特定场景下的低质量图像仍是一大挑战。 低分…...
spring RestTemplate使用说明
rest-template是spring对httpclient的逻辑封装,它底层还是基于httpclient,所以一些配置其实跟httpclient是强相关的。 基本配置 rest-template可以不带参数,使用默认配置,也可以指定ClientHttpRequestFactory参数,Cl…...
设置HP条UI
概述 设置常见的生命值条, 实现过程 设置UI/image作为形状 设置UI/Image作为背景 设置UI/image(healthfill)作为填充图片,层数低于背景 设置heathfill的imagetype为filled fillmethod为horizontal [SerializeField] private Im…...
常见排序算法总结 (五) - 堆排序与堆操作
堆排序(借助 API) 算法思想 利用堆能够维护数组中最大值的性质,根据数组元素建立最大堆,依次弹出元素并维护堆结构,直到堆为空。 稳定性分析 堆排序是不稳定的,因为堆本质上是完全二叉树,排…...
Linux 本地编译安装 gcc9
这里演示非sudo权限的本地linux 用户安装 gcc9 下载源代码: 可以从GCC官方网站或其镜像站点下载GCC 9的源代码压缩包。使用wget或curl命令,这通常不需要额外权限 wget https://ftp.gnu.org/gnu/gcc/gcc-9.5.0/gcc-9.5.0.tar.gz tar -xf gcc-9.5.0.tar…...
开源FreeSWITCH大模型智能客服系统的最佳实践
开源 FreeSWITCH 大模型智能客服系统的最佳实践 原作者:开源呼叫中心FreeIPCC,其Github:https://github.com/lihaiya/freeipcc 引言 开源 FreeSWITCH 大模型智能客服系统因其灵活性、成本效益和技术先进性,成为众多企业提升客户…...
大数据技术与应用——数据可视化(山东省大数据职称考试)
大数据分析应用-初级 第一部分 基础知识 一、大数据法律法规、政策文件、相关标准 二、计算机基础知识 三、信息化基础知识 四、密码学 五、大数据安全 六、数据库系统 七、数据仓库. 第二部分 专业知识 一、大数据技术与应用 二、大数据分析模型 三、数据科学 数据可视化 大…...
大数据之Hbase环境安装
Hbase软件版本下载地址: http://mirror.bit.edu.cn/apache/hbase/ 1. 集群环境 Master 172.16.11.97 Slave1 172.16.11.98 Slave2 172.16.11.99 2. 下载软件包 #Master wget http://archive.apache.org/dist/hbase/0.98.24/hbase-0.98.24-hadoop1-bin.tar.gz…...
Node.js day-01
01.Node.js 讲解 什么是 Node.js,有什么用,为何能独立执行 JS 代码,演示安装和执行 JS 文件内代码 Node.js 是一个独立的 JavaScript 运行环境,能独立执行 JS 代码,因为这个特点,它可以用来编写服务器后端…...
OpenCV相机标定与3D重建(25)计算两个三维点集之间的最优仿射变换矩阵(3x4)函数estimateAffine3D()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 计算两个3D点集之间的最优仿射变换。 它计算 [ x y z ] [ a 11 a 12 a 13 a 21 a 22 a 23 a 31 a 32 a 33 ] [ X Y Z ] [ b 1 b 2 b 3 ] \beg…...
SQL 中 INNER JOIN 和 LEFT JOIN 的区别和用法
在数据库语言 SQL 中,连接 (也称进行表结合操作)是一种常见的操作,用于将多个数据表格核实关联进行查询。常见的连接类型中, INNER JOIN 和 LEFT JOIN 是最基本且最常用的。下面将给出完整的区别和用法说明。 1. 基本概念 INNER JOIN (内连…...
【计算机网络】lab2 Ethernet(链路层Ethernet frame结构细节)
🌈 个人主页:十二月的猫-CSDN博客 🔥 系列专栏: 🏀各种软件安装与配置_十二月的猫的博客-CSDN博客 💪🏻 十二月的寒冬阻挡不了春天的脚步,十二点的黑夜遮蔽不住黎明的曙光 目录 1. …...
2024 年 MySQL 8.0.40 安装配置、Workbench汉化教程最简易(保姆级)
首先到官网上下载安装包:http://www.mysql.com 点击下载,拉到最下面,点击社区版下载 windows用户点击下面适用于windows的安装程序 点击下载,网络条件好可以点第一个,怕下着下着断了点第二个离线下载 双击下载好的安装…...
提升PHP技能:18个实用高级特性
掌握PHP基础知识只是第一步。 深入了解这18个强大的PHP特性,将显著提升您的开发效率和代码质量。 1、超越 __construct() 的魔法方法 虽然 __construct() 为大多数开发者所熟知,PHP 却提供了更多强大的魔术方法,例如: class Da…...
QT数据库(三):QSqlQuery使用
QSqlQuery 简介 QSqlQuery 是能运行任何 SQL 语句的类,如 SELECT、INSERT、UPDATE、DELETE 等 SQL 语句。所以使用 QSqlQuery 几乎能进行任何操作,例如创建数据表、修改数据表的字段定义、进行数据统计等。如果运行的是 SELECT 语句,它查询…...
【机器学习】在向量的流光中,揽数理星河为衣,以线性代数为钥,轻启机器学习黎明的瑰丽诗章
文章目录 线性代数入门:机器学习零基础小白指南前言一、向量:数据的基本单元1.1 什么是向量?1.1.1 举个例子: 1.2 向量的表示与维度1.2.1 向量的维度1.2.2 向量的表示方法 1.3 向量的基本运算1.3.1 向量加法1.3.2 向量的数乘1.3.3…...
设计模式详解(十一):模板方法——Template Method
Template Method 设计模式 1. 概述 Template Method 是一种行为设计模式,它定义了一个算法的框架,并允许子类在不改变算法结构的前提下重新定义算法中的某些步骤。 在 Template Method 模式中: 父类(抽象类)定义了…...
使用 DeepSpeed 微调 OPT 基础语言模型
文章目录 OPT 基础语言模型Using OPT with DeepSpeedmain.py 解析1、导入库和模块2、解析命令行参数3、main 函数3.1 设备与分布式初始化3.2 模型与数据准备3.3 定义评估函数3.4 优化器与学习率调度器设置3.5 使用 deepspeed 进行模型等初始化3.6 训练循环3.7 模型保存 4、dsch…...
DPDK用户态协议栈-TCP Posix API 2
tcp posix api send发送 ssize_t nsend(int sockfd, const void *buf, size_t len, __attribute__((unused))int flags) {ssize_t length 0;void* hostinfo get_host_fromfd(sockfd);if (hostinfo NULL) {return -1;}struct ln_tcp_stream* stream (struct ln_tcp_stream…...
打造微信小程序中的视频播放交互体验:videoUI组件库实战
本文还有配套的精品资源,点击获取 简介:本项目介绍如何利用 videoUI 组件库在微信小程序中实现视频切换播放和全屏播放功能。涵盖微信小程序开发基础、 <video> 组件使用、视频切换逻辑、全屏播放实现以及 videoUI 库的应用。为开发者提供…...
Django REST framework(DRF)在处理不同请求方法时的完整流程
文章目录 一、POST 请求创建对象的流程二、GET 请求获取对象列表的流程三、GET 请求获取单个对象的流程四、PUT/PATCH 请求更新对象的流程五、自定义方法的流程自定义 GET 方法自定义 POST 方法 一、POST 请求创建对象的流程 请求到达视图层 方法调用: dispatch说明…...
【Hive】-- hive 3.1.3 伪分布式部署(单节点)
1、环境准备 1.1、版本选择 apache hive 3.1.3 apache hadoop 3.1.0 oracle jdk 1.8 mysql 8.0.15 操作系统:Mac os 10.151.2、软件下载 https://archive.apache.org/dist/hive/ https://archive.apache.org/dist/hadoop/ 1.3、解压 tar -zxvf apache-hive-4.0.0-bin.tar…...
unity 雷达
unity 雷达 首先去商店下载TouchScript插件 导入的时候勾选Enable TUIO 然后把预制体Cursors和TouchManager拖上 最后把TuioInput这个脚本挂上 脚本上的端口号尽量不改...