当前位置: 首页 > news >正文

4、RabbitMQ的七种工作模式介绍

目录

一、Simple(简单模式)

1.1 概念

1.2 代码实现

消费者

运行结果

二、Work Queue(工作队列)

2.1 概念

1.2 代码实现

生产者

消费者

 运行结果

三、Publish/Subscribe(发布/订阅模式)

3.1 概念

3.2 代码实现

生产者

消费者

运行结果

四、Routing(路由模式)

4.1 概念

4.2 代码实现

Constants类

生产者

消费者

运行结果

五、Topics(通配符模式)

5.1 概念

5.2 代码实现

生产者

消费者

运行结果

六、RPC(RPC通信)了解

6.1 概念

6.2 代码实现

客户端代码编写

编写服务器代码

运行结果:

七、Publish Confirms(发布确认模式)​​​​​​​

publishing Messages Individually(单独确认)

Publishing Messages in Batches(批量确认)

Handling Publisher Confirms Asynchronously(异步确认)


一、Simple(简单模式)

1.1 概念

P: 生产者,也就是要发送消息的程序

C: 消费者,消息的接受者

Queue:消息队列,图中Queue类似提个邮箱,可以缓存消息;生产者向其中投递消息,消费者从中取出消息。

特点:一个生产者P,一个消费者C,消息只能被消费一次,也称为点对点(Point-to-Point)模式

适用场景:消息只能被单个消费者处理

1.2 代码实现

消费者

package rabbitmq.simple;import com.rabbitmq.client.*;
import java.io.IOException;public class ConsumerDemo {public static void main(String[] args) throws Exception {//1.创建连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost("8.136.108.248");// 设置RabbitMQ服务器的端口号connectionFactory.setPort(5672);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername("pinkboy");// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword("123456");// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost("/");// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();/***3.声明一个队列** @param channel RabbitMQ的通道,用于执行队列操作** 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:* 1. 队列名称("hello"):指定要声明的队列的名称* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享* 4. false:表示该队列不会在使用后自动删除,需要手动删除* 5. null:表示不设置额外的参数** 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息*/channel.queueDeclare("hello", true, false, false, null);// 4.开始从名为"hello"的队列中消费消息channel.basicConsume("hello", true, new DefaultConsumer(channel) {/*** 处理接收到的消息** @param consumerTag 消费者标签,用于标识消费者* @param envelope 包含消息路由信息的信封* @param properties 消息的属性,如内容类型、内容编码等* @param body 消息的主体内容,以字节数组形式表示* @throws IOException 如果处理消息时发生I/O错误*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});Thread.sleep(2000);//5.关闭资源channel.close();connection.close();}
}

运行结果

启动生产者代码:

观察消息队列

 启动消费者代码:

观察消息队列 

二、Work Queue(工作队列)

2.1 概念

一个生产者P,多个消费者C1,C2 在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息

特点:消息不会重复,分配各不同的消费者

适用场景:集群环境中做异步处理

举个例子:12306短息通知服务,订票成功后,订单消息会发送到RabbitMQ,短信服务从RabbitMQ中获取订单信息,并发送通知信息(在短信服务之间进行任务分配)

1.2 代码实现

工作模式就是简单模式的增强版 和简单模式的区别就是 简单模式就一个消费者,工作模式支持多个消费者接收消息,消费者之间是竟争关系 每个消息只能被一个消费者接收

和简单模式代码差不多 为了展示多个消费者竞争的关系 生产者一次生产10条消息

常量类

package rabbitmq.constant;public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//工作队列模式public static final String WORK_QUEUE = "work_queue";}

生产者

package rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2、创建通道Channel channel = connection.createChannel();//3、声明交换机//4、声明队列/*** 声明一个队列** @param channel RabbitMQ的通道,用于执行队列操作** 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:* 1. 队列名称("hello"):指定要声明的队列的名称* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享* 4. false:表示该队列不会在使用后自动删除,需要手动删除* 5. null:表示不设置额外的参数** 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息*/channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//5、发送消息// 循环发送消息到 RabbitMQ 的 "hello" 队列中for (int i = 0; i < 10; i++) {// 构造消息内容String msg = "hello work queue ..." + i;/*** 参数1 表示交换机名称,因为使用默认交换机,所以为空字符串* 参数2 表示队列名称* 参数3 :消息的属性* 参数4:消息内容*/channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功!");//6、释放资源channel.close();connection.close();}
}

消费者

两个消费者的代码是一样的

消费者1

package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");//5.关闭资源
//        channel.close();
//        connection.close();}
}

消费者2

package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);channel.basicConsume(Constants.WORK_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 启动成功!");//5.关闭资源
//        channel.close();
//        connection.close();}
}

 运行结果

生产者

生产者消息队列

 

消费者

为了避免第一个启动的消费者会将10条消息消费掉 需要先启动两个消费者,再去启动生产者

消费者1

消费者2

观察消息队列 

可以看到管理页面中有两个消费者被显示

三、Publish/Subscribe(发布/订阅模式)

3.1 概念

Exchange: 交换机 (X).
作⽤: 生产者将消息发送到Exchange, 由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产 者将消息投递到队列中, 实际上这个在RabbitMQ中不会发生. )
RabbitMQ交换机有四种类型: fanout,direct, topic, headers, 不同类型有着不同的路由策略. AMQP协议里还有另外两种类型, System和⾃定义, 此处不再描述.
Fanout:广播,将消息交给所有绑定到交换机的队列( Publish/Subscribe模式)
一个生产者P,多个消费者C1,C2,X代表交换机消息复制多份,每个消费者接收相同的消息
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者
适合场景:消息需要被多个消费者同时接收的场景.如:实时通知或者广播消息

3.2 代码实现

常量类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//发布订阅模式public static final String FANOUT_EXCHANGE = "fanout_exchange";public static final String FANOUT_QUEUE1 = "fanout_queue1";public static final String FANOUT_QUEUE2 = "fanout_queue2";
}

这个模式需要创建交换机,并绑定队列和交换机 

//3、声明交换机
/*** 参数1:交换机名称* 参数2:交换机类型 Fanout类型 -> 广播机制* 参数3:是否持久化*/
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

声明队列

//4、声明队列
/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除* 参数5:其他参数*/
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

绑定队列和交换机

//5、交换机和队列绑定
/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则 如果交换机类型为fanout类型,routingKey设置为空字符串*/
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");

生产者

package rabbitmq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、打开信道Channel channel = connection.createChannel();//3、声明交换机/*** 参数1:交换机名称* 参数2:交换机类型 Fanout类型 -> 广播机制* 参数3:是否持久化*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//4、声明队列/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除* 参数5:其他参数*/channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//5、交换机和队列绑定/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则 如果交换机类型为fanout类型,routingKey设置为空字符串*/channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");//6、发布消息String msg = "hello fanout ...";channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());System.out.println("消息发送成功!");//7、释放资源channel.close();connection.close();}
}

消费者

消费者1

package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);//4、消费消息channel.basicConsume(Constants.FANOUT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}

消费者2

package rabbitmq.fanout;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//4、消费消息channel.basicConsume(Constants.FANOUT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer2 启动成功!");}
}

运行结果

启动生产者

观察消息队列

fanout_queue1 和 fanout_queue2 分别有了1条消息

 Exchange中多了队列的绑定关系

启动两个消费者

 

观察消息队列

四、Routing(路由模式)

4.1 概念

路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key

发布订阅模式是无条件的将所有消息分发给所有的消费者,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列

适合场景:需要根据特定规则分发消息的场景

比如系统打印日志, 日志等级分为error, warning, info,debug, 就可以通过这种模式,把不同的日志发
送到不同的队列, 最终输出到不同的⽂件
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
 
RoutingKey:路由键.生产者将消息发给交换器时,指定的一个字符串,用来告诉交换机应该如何处理这个消息.
 
BindingKey:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候一般会指定一个BindingKey,这样RabbitMQ就知道如何正确地将消息路由到队列了.

队列和交换机的绑定,不能是任意的绑定了,而是要制定了一个BindKey(RoutingKey的一种)消息的发送方在向Exchange发送消息时也需要指定消息的RoutingKey

Exchange也不再把消息交给每一个绑定的key,而是根据消息的RountingKey进行判断,只要队列的BindingKey和发送消息的RoutingKey完全一致,才会接收到消息

创建交换机,定义交换机类型为BuiltinExchangeType.DIRECT

4.2 代码实现

Constants类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";
}

生产者

package rabbitmq.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;public class Producer {public static void main(String[] args) throws Exception {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.  创建信道Channel channel = connection.createChannel();//3.  声明交换机/*** 参数1:交换机名称* 参数2:交换机类型* 参数3:是否持久化* 参数4:是否自动删除* 参数5:其他参数*/channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, "direct", true, false, null);//4.  声明队列/*** 参数1:队列名称* 参数2:是否持久化* 参数3:是否独占队列,该队列只允许在该连接中访问,如果连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 参数4:是否自动删除,队列不再使用时是否自动删除此队列,如果将此参数和参数2设置为true就可以实现临时队列(队列不用了就自动删除)* 参数5:其他参数*/channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5、交换机和队列绑定/*** 参数1:队列名称* 参数2:交换机名称* 参数3:路由键,绑定规则*/channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6、发布消息String msg_a = "hello direct my routingKey is a...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg_a.getBytes());String msg_b = "hello direct my routingKey is b...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());String msg_c = "hello direct my routingKey is c...";channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());System.out.println("消息发送成功!");//7、释放资源channel.close();connection.close();}
}

消费者

package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//4、消费消息channel.basicConsume(Constants.DIRECT_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
package rabbitmq.direct;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、创建信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//4、消费消息channel.basicConsume(Constants.DIRECT_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}

运行结果

启动生产者

观察消息队列界面

exchange下队列和Routing Key的绑定关系

启动消费者

观察消息队列界面

五、Topics(通配符模式)

5.1 概念

在路由模式上进行了升级,在routingKey的基础上,增加可通配符的功能,适之更加灵活

Topic和Routing的基本原同,即:生产者将消息发送给交换机,交换机根据RoutingKey将消息转发给RoutingKey匹配的队列,类似于正则表达式的方式来定义RoutingKey的模式. 

适合场景:需要灵活匹配和过滤消息的场景

Topic和Routing模式的区别:

1、topic模式使用的交换机类型为topic(Rounting模式使用的交换机类型为direct)

2、topic类型的交换机在匹配规则上进行了扩展,BingingKey支持通配符匹配(direct类型的将换季路由规则是BingKey和RoutingKey完全匹配)

在topic类型的交换机在匹配规则上有些要求:

1.RoutingKey是一系列由点(.)分隔的单词,比如“stock.usd.nyse”,"nyse.vmw","quick.organge.rabbit"

2. BingdingKey和RountingKey,也是点(.)分隔的字符串

3. BingdingKey中可以存在两种特殊的字符串,用于模糊匹配

  * 表示一个单词

  # 表示0-N个单词

举个例子:

Binding Key 为"d.a.b" 会同时路由到Q1 和Q2
Binding Key 为"d.a.f" 会路由到Q1
Binding Key 为"c.e.f" 会路由到Q2
Binding Key 为"d.b.f" 会被丢弃, 或者返回给⽣产者(需要设置mandatory参数)

5.2 代码实现

Constants类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";}

生产者

创建交换机类型为BuiltinExchangeType.TOPIC

 //3.声明交换机

channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);
 

声明队列

//4.声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

 绑定交换机和队列

//5.绑定交换机和队列
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
package rabbitmq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);//4.声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5.绑定交换机和队列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6、发布消息String msg_a = "hello topic my routingKey is ae.a.f...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes());String msg_b = "hello topic my routingKey is ef.a.b...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());String msg_c = "hello topic my routingKey is c...";channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes());System.out.println("消息发送成功!");}
}

消费者

package rabbitmq.topic;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE1, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}
public class Consumer2 {public static void main(String[] args) throws Exception {//1、建立连接// 创建一个ConnectionFactory实例来配置RabbitMQ连接ConnectionFactory connectionFactory = new ConnectionFactory();// 设置RabbitMQ服务器的主机地址connectionFactory.setHost(Constants.HOST);// 设置RabbitMQ服务器的端口号connectionFactory.setPort(Constants.PORT);// 设置登录RabbitMQ服务器的用户名connectionFactory.setUsername(Constants.USER_NAME);// 设置登录RabbitMQ服务器的密码connectionFactory.setPassword(Constants.PASSWORD);// 设置RabbitMQ服务器的虚拟主机connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);// 使用ConnectionFactory创建一个新的连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);channel.basicConsume(Constants.TOPIC_QUEUE2, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印消息的主体内容System.out.println("body:" + new String(body));}});System.out.println("Consumer1 启动成功!");}
}

运行结果

生产者

观察消息队列界面

消费者

六、RPC(RPC通信)了解

6.1 概念

RPC(Remote Procedure Call) 即远程调用 它是一种通过网络从远程计算机上请求服务,而不是需要了解底层网络的技术,类似HTTP远程调用

RabbitMQ实现RPC通信的过程,大概率是通过两个队列实现一个可回调的过程

大概流程

1.客户端发消息到一个指定的队列,并在消息属性中设置replyTo字段,这个字段指定一个回调队列,服务端处理后,会把响应结果发送到这个队列

2.服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列

3.客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应

客户端:

1 发送请求(携带replyTo,CorrelationID)

2 接收响应(校验correlationID)

服务端:

1 接受请求,进行响应

2 发送响应(按照客户端指定的replyTo,设置correlationID)

6.2 代码实现

客户端代码编写

1、声明两个队列 RPC_REQUEST_QUEUE和RPC_RESPONSE_QUEUE,声明本次请求的唯一标志correlationID

2、将RPC_RESPONSE_QUEUE和correlationID配置到要发送的消息队列中

3、使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中

4、阻塞队列有消息后,主线程被唤醒,打印返回内容

Constants类

public class Constants {public static final String HOST = "8.136.108.248";public static final Integer PORT = 5672;public static final String USER_NAME = "pinkboy";public static final String PASSWORD = "123456";public static final String VIRTUAL_HOST = "/";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";}

客户端代码

package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;/*** rpc 客户端* 1. 发送请求* 2. 等待响应*/
public class RpcClient {public static void main(String[] args) throws Exception { //1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、开启信道Channel channel = connection.createChannel();channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//3、发送请求String msg = "hello rpc...";//设置请求唯一标识//设置请求的相关属性// 生成一个唯一的关联ID,用于跟踪请求和响应String correlationID = UUID.randomUUID().toString();// 创建并配置AMQP基本属性,设置消息的关联ID和回复队列AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();// 发布消息到指定的请求队列,携带配置的属性和消息体channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, prop, msg.getBytes());//4、接收响应//使用阻塞队列,存储响应信息final ArrayBlockingQueue<String> response = new ArrayBlockingQueue<>(1);channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回调消息:" + respMsg);if (correlationID.equals(properties.getCorrelationId())) {//如果correlationID校验一致,则将响应信息保存在response中response.offer(respMsg);}}});/*** 阻塞等待响应*/String take = response.take();System.out.println("[RPC Client 响应结果]:" + take);}
}

编写服务器代码

package rabbitmq.rpc;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;/*** 1、接受请求* 2、发送响应*/
public class RpcServer {public static void main(String[] args) throws Exception {//1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//2、开启信道Channel channel = connection.createChannel();//3、接受请求channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("接收到请求:" + request);String response = "针对request:" + request + ",响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

运行结果:

启动客户端

观察消息队列界面

启动服务器端

客户端输出消息 

 观察消息队列界面

七、Publish Confirms(发布确认模式)

作为消息中间件,都会面临消息丢失的问题

消息丢失大概分为三种情况

1、生产者问题 因为应用程序故、障网络抖动等原因,生产者没有成功想broker发送消息

2、消息中间件自身问题,生产者成功发送给Broker 但是Broker没有把消息保存好,导致消息丢失

3、消费者问题,Broker发送到消费者,消费者在消费时,因没处理好,导致消费者失败的消息从队列中删除了

针对问题1 可以采用发确认(Publisher Cofirms)机制实现

生产者将信道设置成confirm(确认)模式,一但信道进入confirm模式,所有在该信道上面发布的消息都是会被指派一个唯一的ID(从1开始),一但消息被投递到所有匹配的队列之后,RabbitMq就会发送一个确认给生产者(包括消息的唯一ID)这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么消息确认会在将消息写入到磁盘之后发出,broker回传给生产者的确认消息中

deliveryTag包包含了消息的序号,此外broker也可以设置channel basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到处理

发送方确认机制最大的好处是他是异步的,生产者可以同时发布消息和等待信道返回确认消息

1、当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息

2、如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该nack命令

使用发送确认机制,必须要将信道设置成confirm(确认)模式

package rabbitmq.comfirms;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 200;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {// 发布确认publishingMessagesIndividually();// 批量发布确认publishingMessagesInBatchs();// 异步发布确认publishingMessagesAsynchronously();}private static void publishingMessagesAsynchronously() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, false, null);//4、监听confirm消息//集合中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();} else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理,比如重发}});//5.发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;long seqNO = channel.getNextPublishSeqNo();channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNO);}while (!confirmSeqNo.isEmpty()) {Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesInBatchs() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, false, null);//4、发送消息long start = System.currentTimeMillis();int batchSize = 100;int outStandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, msg.getBytes());outStandingMessageCount++;if (outStandingMessageCount == batchSize) {//6、等待确认channel.waitForConfirms(5000);outStandingMessageCount = 0;}}if (outStandingMessageCount > 0) {channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {//1 、创建信道Channel channel = connection.createChannel();//2、设置信道为confirm模式channel.confirmSelect();//3、声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, false, null);//4、发送消息long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publish confirms" + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, msg.getBytes());//5、等待确认channel.waitForConfirms(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, (end - start));} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException | InterruptedException e) {throw new RuntimeException(e);}}
}

publishing Messages Individually(单独确认)

观察上面代码, 会发现这种策略是每发送⼀条消息后就调用channel.waitForConfirms方法,之后等待服务端的确认, 这实际上是⼀种串行同步等待的方式. 尤其对于持久化的消息来说, 需要等待消息确
认存储在磁盘之后才会返回(调用Linux内核的fsync方法). 但是发布确认机制是⽀持异步的. 可以⼀边发送消息, ⼀边等待消息确认。

Publishing Messages in Batches(批量确认)

 

相比于单独确认策略, 批量确认极大地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们 不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量.
当消息经常丢失时,批量确认的性能应该是不升反降的.

Handling Publisher Confirms Asynchronously(异步确认)

异步confirm方法的编程实现最为复杂. Channel 接⼝提供了⼀个方法addConfirmListener. 这个方法
可以添加ConfirmListener 回调接口.
ConfirmListener 接口中包含两个方法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理 RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表示 发送消息的序号. multiple 表示 是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集
合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的
deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.
1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表示小于等于当前序号
deliveryTag的消息都收到了, 则清除对应集合
2. 当收到nack时, 处理逻辑类似, 不过 需要结合具体的业务情况, 进型消息重发等操作.

相关文章:

4、RabbitMQ的七种工作模式介绍

目录 一、Simple(简单模式) 1.1 概念 1.2 代码实现 消费者 运行结果 二、Work Queue&#xff08;工作队列&#xff09; 2.1 概念 1.2 代码实现 生产者 消费者 运行结果 三、Publish/Subscribe&#xff08;发布/订阅模式&#xff09; 3.1 概念 3.2 代码实现 生产者…...

React Three Fiber 详解:现代 Web3D 的利器

React Three Fiber 详解:现代 Web3D 的利器 随着 Web 技术的发展,3D 场景与交互已经不再是游戏开发者的专利。越来越多的网站、产品页、交互动画,开始大量引入 3D 元素。要在 React 项目中高效使用 WebGL,React Three Fiber(简称 R3F)成为了目前最主流的选择。 今天这篇…...

同步与互斥(同步)

线程同步 条件变量 当⼀个线程互斥地访问某个变量时&#xff0c;它可能发现在其它线程改变状态之前&#xff0c;它什么也做不了。 例如⼀个线程访问队列时&#xff0c;发现队列为空&#xff0c;它只能等待&#xff0c;只到其它线程将⼀个节点添加到队列中。这种情况就需要⽤到条…...

C语言教程(二十一):C 语言预处理器详解

一、预处理器概述 C语言预处理器是一个文本替换工具&#xff0c;它会对源代码进行扫描&#xff0c;处理以 # 开头的预处理指令。这些指令可以控制预处理器的行为&#xff0c;实现宏定义、文件包含、条件编译等功能。预处理器的主要作用是为后续的编译过程准备代码。 二、常见的…...

grafana/loki 设置日志保留时间

loki:limits_config:retention_period: 189h参考官网 Configuring the retention period...

Spring Boot × K8s 监控实战-集成 Prometheus 与 Grafana

在微服务架构中&#xff0c;应用的可观测性至关重要。Kubernetes 已成为容器化部署的标准&#xff0c;但其自身的监控能力有限&#xff0c;需要与其他工具集成才能实现详细的运行数据采集与分析。 本文将通过 Spring Boot Kubernetes Prometheus Grafana 实战&#xff0c;打…...

SpringBoot+Mybatis通过自定义注解实现字段加密存储

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; SpringBootMybatis实现字段加密 ⏱️ 创作时间&#xff1a; 2025年04月…...

Vue3调度器错误解析,完美解决Unhandled error during execution of scheduler flush.

目录 Vue3调度器错误解析&#xff0c;完美解决Unhandled error during execution of scheduler flush. 一、问题现象与本质 二、七大高频错误场景与解决方案 1、Setup初始化陷阱 2、模板中的"幽灵属性" 3、异步操作的"定时炸弹" 4、组件嵌套黑洞 5…...

第35周Zookkeeper+Dubbo Zookkeeper

第35周ZooKeeperDubbo ZooKeeper 一、周介绍 本周主要内容包括ZooKeeper、Dubbo以及面试三部分。 1.1 ZooKeeper 节点介绍 ZooKeeper的数据结构核心是每个node节点。节点具有属性、特点和功能&#xff0c;其数据结构为树形结构&#xff0c;类似于多叉树&#xff0c;分隔符是…...

基于tabula对pdf中多个excel进行识别并转换成word中的优化(四)

对上一节进行优化&#xff1a; 1、识别多个excel 2、将表格中的nan替换成空字符串 一、示例中的pdf内容 二、完整代码参考&#xff1a; import tabula import numpy as np from docx import Document from docx.oxml.ns import qn from docx.oxml import OxmlElementdef get_t…...

Electron-vite中ELECTRON_RENDERER_URL环境变量如何被设置的

近期我专注于前端技术栈 Electron 与 Vue3 的学习实践&#xff0c;依照教程网站 快速开始 | electron-vite 的快速入门指引&#xff0c;搭建了一个示例项目。成功完成项目下载&#xff0c;并通过 npm run dev 命令启动项目后&#xff0c;在研读项目 main 目录下的 index.ts 文件…...

Electron Forge【实战】桌面应用 —— 将项目配置保存到本地

最终效果 定义默认配置 src/initData.ts export const DEFAULT_CONFIG: AppConfig {language: "zh",fontSize: 14,providerConfigs: {}, };src/types.ts export interface AppConfig {language: zh | enfontSize: numberproviderConfigs: Record<string, Recor…...

gem5-gpu 安装过程碰到的问题记录 关于使用 Ruby + Garnet

如何使用Garnet? 这并不像一组命令行参数那么简单。要使用gem5-gpu+garnet,您可能需要修改python配置脚本。 问题是配置文件gem5-gpu/configs/gpu_protocol/VI_hammer_fusion.py指定了链接的intBW和extBW。 看来Garnet不支持这一点。然而,似乎所有的链路都是相同的带宽,所…...

全平台开源即时通讯IM框架MobileIMSDK:7端+TCP/UDP/WebSocket协议,鸿蒙NEXT端已发布,5.7K Stars

一、基本介绍 MobileIMSDK是一套全平台原创开源IM通信层框架&#xff1a; 超轻量级、高度提炼&#xff0c;lib包50KB以内&#xff1b;精心封装&#xff0c;一套API同时支持UDP、TCP、WebSocket三种协议&#xff08;可能是全网唯一开源的&#xff09;&#xff1b;客户端支持iOS…...

《阿里Qwen3开源:AI新纪元的破晓之光》

《阿里Qwen3开源:AI新纪元的破晓之光》 惊爆!阿里释放 Qwen3 “大杀器” 在人工智能的星辰大海中,每一次新模型的诞生都如同点亮一颗新星,而阿里巴巴此次发布并开源 Qwen3,无疑是投下了一枚震撼弹,瞬间吸引了全球 AI 领域的目光。这不仅是阿里在 AI 征程上的一座重要里程…...

前端防护利器:disable-devtool 使用指南 - 保护你的Web应用安全

文章目录 前端防护利器:disable-devtool 使用指南 - 保护你的Web应用安全为什么需要禁用开发者工具?什么是 disable-devtool?安装与引入通过npm/yarn安装通过CDN引入ES6模块引入配置选项详解完整使用示例检测模式说明最佳实践在线考试系统防护敏感数据保护注意事项更多资源前…...

万物皆可执行:多功能机器人正在定义新生产力法则

引言 当波士顿动力的Atlas完成高难度体操动作&#xff0c;当特斯拉Optimus在工厂精准分拣零件&#xff0c;当小鹏Iron机器人以拟态双手递上咖啡——这些场景不再只是科幻电影的桥段&#xff0c;而是多功能机器人&#xff08;Polyfunctional Robots&#xff09;带来的真实变革…...

从车道检测项目入门open cv

从车道检测项目入门open cv 前提声明&#xff1a;非常感谢b站up主 嘉然今天吃带变&#xff0c;感谢其视频的帮助。同时希望各位大佬积积极提出宝贵的意见。&#x1f60a;&#x1f60a;&#x1f60a;(❁◡❁)(●’◡’●)╰(▽)╯ github地址&#xff1a;https://github.com/liz…...

Vue3取消网络请求的方法(AbortController)

在 Vue3 中&#xff0c;已经发出的请求是否可以被取消&#xff0c;取决于你使用的 HTTP 客户端库。Vue3 本身不直接处理 HTTP 请求&#xff0c;但通常搭配 Axios 或原生 fetch 使用。以下是两种主流方案的取消方法&#xff1a; 1. 使用 Axios CancelToken Axios 提供了 Cance…...

深度解析Qwen3:性能实测对标Gemini 2.5 Pro?开源大模型新标杆的部署挑战与机遇

大语言模型&#xff08;LLM&#xff09;的浪潮持续席卷技术圈&#xff0c;性能天花板不断被刷新。以 Gemini 2.5 Pro 为代表的闭源模型展现了惊人的能力&#xff0c;但其高昂的成本和有限的可访问性也让许多开发者望而却步。与此同时&#xff0c;开源力量正以前所未有的速度崛起…...

AI遇见端动态神经网络:Cephalon(联邦学习+多模态编码)认知框架构建

前引&#xff1a; 在数字化浪潮席卷全球的今天&#xff0c;数据爆炸与算力需求的指数级增长正推动着云计算向更智能、更高效的方向演进。面对海量终端设备的实时响应需求、复杂AI模型的分布式训练挑战&#xff0c;以及多场景数据的协同处理难题&#xff0c;传统云架构逐渐显露出…...

机器学习之五:基于解释的学习

正如人们有各种各样的学习方法一样&#xff0c;机器学习也有多种学习方法。若按学习时所用的方法进行分类&#xff0c;则机器学习可分为机械式学习、指导式学习、示例学习、类比学习、解释学习等。这是温斯顿在1977年提出的一种分类方法。 有关机器学习的基本概念&#xff0c;…...

高翔视觉slam中常见的OpenCV和Eigen的几种数据类型的内存布局及分配方式详解

vector<Eigen::Vector2d, Eigen::aligned_allocator<Eigen::Vector2d>> 内存布局及分配方式详解 1. 内存对齐的必要性 Eigen 的固定大小类型(如 Eigen::Vector2d、Eigen::Matrix4d 等)需要 16 字节内存对齐,以支持 SIMD 指令(如 SSE/AVX)的并行计算。若未对…...

电子电器架构 --- 人工智能、固态电池和先进自动驾驶功能等新兴技术的影响

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 钝感力的“钝”,不是木讷、迟钝,而是直面困境的韧劲和耐力,是面对外界噪音的通透淡然。 生活中有两种人,一种人格外在意别人的眼光;另一种人无论…...

【C++11】类的新功能

前言 上文我们学习了包装器&#xff1a;function和bind。function可以包装一切可调用对象&#xff0c;并用统一的调用方式调用不同的可调用对象。bind则可以控制函数参数个数【C11】包装器&#xff1a;function与bind-CSDN博客 本文我们来学习C11的类中新增的一些功能 默认的移…...

1.6 点云数据获取方式——单目相机多视图几何

图1-6-1多视图几何重建 单目相机的多视图几何研究具有重要的理论与实际意义。在理...

马井堂-区块链技术:架构创新、产业变革与治理挑战(马井堂)

区块链技术&#xff1a;架构创新、产业变革与治理挑战 摘要 区块链技术作为分布式账本技术的革命性突破&#xff0c;正在重构数字时代的信任机制。本文系统梳理区块链技术的核心技术架构&#xff0c;分析其在金融、供应链、政务等领域的实践应用&#xff0c;探讨共识算法优化、…...

MicroBlaze软核的开发使用

一、MicroBlaze 介绍 MicroBlaze 是由 Xilinx 开发的一种可配置的 32 位 RISC 软处理器内核。它作为 FPGA 设计中的 IP 核&#xff0c;通过 Vivado 工具进行配置和集成。MicroBlaze 提供了高度的灵活性&#xff0c;允许开发人员根据应用需求调整处理器的功能、性能和资源占用。…...

是从原始数据到价值挖掘的完整流程解析,涵盖数据采集、清洗、存储、处理、建模、可视化等核心环节,并附上完整代码示例(含详细注释)及技术选型建议表

以下是从原始数据到价值挖掘的完整流程解析&#xff0c;涵盖数据采集、清洗、存储、处理、建模、可视化等核心环节&#xff0c;并附上完整代码示例&#xff08;含详细注释&#xff09;及技术选型建议表。 一、全流程技术栈概览 阶段核心任务关键技术/工具数据采集获取原始数据…...

【爬虫】案例-获取cbh电影

以cupfox.in为例子&#xff1a; 观察ts文件和m3u8文件&#xff0c;可以知道一个完整的视频是由多个ts文件组合&#xff0c;而m3u8则是记录所有ts文件信息的文本 思路 1.先爬一个ts&#xff0c;测试能否观看 2.爬m3u8文件&#xff0c;通过正则分析出变化的部分 3.完整的把每个…...

分治而不割裂—分治协同式敏捷工作模式

分治而不割裂&#xff1a;解密敏捷协同工作模式如何驱动大企业持续领跑 在数字化浪潮中&#xff0c;亚马逊仅用11天完成Prime Day全球技术架构升级&#xff0c;华为5G基站项目组创造过单周迭代47个功能模块的纪录&#xff0c;这些商业奇迹的背后&#xff0c;都隐藏着一个共性秘…...

【MySQL】聚合查询 和 分组查询

个人主页&#xff1a;♡喜欢做梦 欢迎 &#x1f44d;点赞 ➕关注 ❤️收藏 &#x1f4ac;评论 目录 &#x1f334; 一、聚合查询 &#x1f332;1.概念 &#x1f332;2.聚合查询函数 COUNT&#xff08;&#xff09; SUM&#xff08;&#xff09; AVG&#xff08;&…...

Weka通过10天的内存指标数据计算内存指标动态阈值

​ 在数据处理和监控系统中&#xff0c;动态阈值的计算是一种常见的方法&#xff0c;用以根据数据的实际分布和变化来调整阈值&#xff0c;从而更有效地监控和预警。在Weka中&#xff0c;虽然它主要是用于机器学习和数据挖掘的工具&#xff0c;但你可以通过一些间接的方法来实现…...

iOS签名的包支持推送功能吗?

推送失败的可能原因&#xff1a; 1. 生产包没有上报token &#xff0c;所以无法推送成功&#xff0c;需要检查是否在企业包签名后导致无法完成apns的注册&#xff0c;无法从Apple取到token 2. 问题可能出在证书上&#xff0c;因为iOS推送有一个开发证书和一个生产证书&#xff…...

JavaWeb:后端web基础(TomcatServletHTTP)

一、今日内容 二、Tomcat 介绍与使用 介绍 基本使用 小结 配置 配置 查找进程 三、Servlet 什么是Servlet 快速入门 需求 步骤 1.新建工程-模块&#xff08;Maven&#xff09; 2.修改打包方式-war 3.编写代码 /*** 可以选择继承HttpServlet*/ WebServlet("/hello&q…...

关于浏览器对于HTML实体编码,urlencode,Unicode解析

目录 HTML实体编码 URL编码 Unicode编码 解析层次逻辑 为什么<script></script>不可以编码符号 为什么不能编码JavaScript:协议 为什么RCDATA标签中的都会被解析成文本 为什么HTML编码了<>无法执行 HTML实体编码 通过特殊语法&#xff08;<、>…...

C++智能指针滥用带来的性能与内存问题有哪些

在现代C编程中&#xff0c;智能指针&#xff08;Smart Pointers&#xff09;已经成为开发者工具箱中不可或缺的一部分。它们作为一种对传统裸指针&#xff08;Raw Pointers&#xff09;的替代方案&#xff0c;旨在解决长期困扰C开发者的内存管理难题。C作为一门高性能的系统编程…...

C++算法(17):reverse函数用法详解,头文件<algorithm>与实战示例

在C中&#xff0c;std::reverse 函数用于反转容器或数组中元素的顺序&#xff0c;需包含头文件 <algorithm>。以下是其用法详解&#xff1a; 基本用法 函数原型&#xff1a; template <class BidirIt> void reverse(BidirIt first, BidirIt last); 参数&#xf…...

【滑动窗口】最大连续1的个数|将x减到0的最小操作数

文章目录 1.最大连续1的个数2.将x减到0的最小操作数 1.最大连续1的个数 解法&#xff1a; 1.暴力解法给定一个left指针固定左端点元素&#xff0c;再给定一个right指针从左端点元素开始遍历。 当遇到1时&#xff0c;让一个计数器cnt1&#xff0c;当遇到0时&#xff0c;让统计0…...

MySQL 在 CentOS 7 环境下的安装教程

&#x1f31f; 各位看官好&#xff0c;我是maomi_9526&#xff01; &#x1f30d; 种一棵树最好是十年前&#xff0c;其次是现在&#xff01; &#x1f680; 今天来学习C语言的相关知识。 &#x1f44d; 如果觉得这篇文章有帮助&#xff0c;欢迎您一键三连&#xff0c;分享给更…...

嵌入式复习第一章

1. 嵌入式系统概念、应用与特点 2. 嵌入式系统的硬件&#xff08; CPU 、外设&#xff09; 3. 主要嵌入式软件系统&#xff08;应用及 OS &#xff09; 4. 嵌入式系统的发展趋势 嵌入式系统定义 “以 应用为中心 &#xff0c;以计算机技术为基础&#xff0c;并且软硬件…...

【C#】.net core6.0无法访问到控制器方法,直接404。由于自己的不仔细,出现个低级错误,这让DeepSeek看出来了,是什么错误呢,来瞧瞧

&#x1f339;欢迎来到《小5讲堂》&#x1f339; &#x1f339;这是《C#》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解。&#x1f339; &#x1f339;温馨提示&#xff1a;博主能力有限&#xff0c;理解水平有限&#xff0c;若有不对之处望指正&#xff01;&#…...

Tailwind CSS 实战:基于 Kooboo 构建企业官网页面(三)

基于前两篇内容&#xff0c;继续完善企业官网页面&#xff1a; Tailwind CSS 实战&#xff1a;基于 Kooboo 构建企业官网页面&#xff08;一&#xff09;-CSDN博客 Tailwind CSS 实战&#xff1a;基于 Kooboo 构建企业官网页面&#xff08;二&#xff09;-CSDN博客 3.5 联系方…...

Opencv中图像深度(Depth)和通道数(Channels)区别

在OpenCV中&#xff0c;图像深度&#xff08;Depth&#xff09;和通道数&#xff08;Channels&#xff09;是两个完全不同的概念&#xff0c;需严格区分。以下是详细解析&#xff1a; 图像深度&#xff08;Depth&#xff09; 定义&#xff1a;指图像中每个像素通道的位数&#…...

【网络原理】从零开始深入理解HTTP的报文格式(一)

本篇博客给大家带来的是网络HTTP协议的知识点, 重点介绍HTTP的报文格式. &#x1f40e;文章专栏: JavaEE初阶 &#x1f680;若有问题 评论区见 ❤ 欢迎大家点赞 评论 收藏 分享 如果你不知道分享给谁,那就分享给薯条. 你们的支持是我不断创作的动力 . 王子,公主请阅&#x1f68…...

Go语言之路————接口、泛型

Go语言之路————接口 前言接口定义实操&#xff0c;接口的定义和实现接口的继承空接口和Any 泛型类型集 结语 前言 我是一名多年Java开发人员&#xff0c;因为工作需要现在要学习go语言&#xff0c;Go语言之路是一个系列&#xff0c;记录着我从0开始接触Go&#xff0c;到后…...

Go语言中的 `time.Tick` 函数详解

time.Tick 是 Go 标准库中用于创建周期性定时器的简便函数。 函数签名 func Tick(d Duration) <-chan Time核心功能 创建一个周期性的定时器通道当 d < 0 时返回 nil返回一个只读的时间通道&#xff0c;定期发送当前时间 与 NewTicker 的关系 time.Tick 是 time.New…...

打印及判断回文数组、打印N阶数组、蛇形矩阵

打印回文数组 1 1 1 1 1 1 2 2 2 1 1 2 3 2 1 1 2 2 2 1 1 1 1 1 1方法1&#xff1a; 对角线对称 左上和右下是对称的。 所以先考虑左上打印&#xff0c; m i n ( i 1 , j 1 ) \text min(i1,j1) min(i1,j1)&#xff0c;打印出来&#xff1a; 1 1 1 1 1 2 2 2 1 2 3 3 1 2 …...

【图像融合】基于非负矩阵分解分解 CNMF的高光谱和多光谱数据融合附MATLAB代码

基于CNMF的高光谱与多光谱数据融合技术详解 一、非负矩阵分解&#xff08;NMF&#xff09;与约束非负矩阵分解&#xff08;CNMF&#xff09;的核心原理 NMF的基本概念 非负矩阵分解&#xff08;NMF&#xff09;是一种通过将非负矩阵分解为两个非负矩阵乘积的降维方法。给定非负…...

HarmonyOS NEXT 诗词元服务项目开发上架全流程实战(一、项目介绍及实现效果)

在当今数字化时代&#xff0c;如何让传统文化与现代科技相结合&#xff0c;成为了一个值得思考的问题。诗词作为中国传统文化的重要组成部分&#xff0c;承载着丰富的历史信息和文化内涵。为了让更多人了解和欣赏诗词的魅力&#xff0c;我们决定开发一款基于HarmonyOS NEXT的诗…...