消息队列篇--基础篇(消息队列特点,应用场景、点对点和发布订阅工作模式,RabbmitMQ和Kafka代码示例等)
1、消息队列的介绍
消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue,简称MQ)是一种异步通信机制,允许应用程序之间的数据交换通过“消息”进行。生产者将消息发送到消息队列中,无需关心谁来获取消息。消费者从消息队列中读取消息并处理,也无需关心消息的来源。消息队列的核心思想是解耦生产者和消费者,使得它们可以独立运行,而不需要直接交互。
消息队列的特点:
- 解耦:生产者和消费者不需要同时在线,生产者发送消息后可以立即返回,消费者可以在任何时间消费消息。
- 异步通信:生产者和消费者之间是异步的,生产者不需要等待消费者的响应,提高了系统的响应速度。
- 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况。
- 负载均衡:多个消费者可以同时从消息队列中读取消息,分担处理任务,实现负载均衡。
- 高可用性:消息队列通常支持持久化存储,即使系统发生故障,消息也不会丢失。
- 扩展性:消息队列可以通过增加更多的生产者或消费者来扩展系统的处理能力。
2、消息队列的应用场景
(1)、日志收集与分析
- 应用场景:在分布式系统中,各个服务会产生大量的日志数据。使用消息队列可以将日志数据异步传输到集中式日志分析平台(如ELK Stack、Splunk),避免日志写入对业务系统的影响。
- 优点:降低日志写入的延迟,提高系统的吞吐量。
(2)、异步任务处理
- 应用场景:某些任务(如发送邮件、短信、生成报表等)不需要立即执行,可以将其放入消息队列,由后台进程异步处理。
- 优点:减少主业务流程的阻塞,提升用户体验。
同步处理示例图:
使用消息队列异步处理示例图:
(3)、系统解耦
- 应用场景:在微服务架构中,不同服务之间可以通过消息队列进行通信,而不是直接调用对方的接口。这样可以降低服务之间的耦合度,提高系统的可维护性和扩展性。
- 优点:服务之间的依赖关系减弱,便于独立开发和部署。
示例图:
(4)、流量削峰
- 应用场景:在高并发场景下,前端请求可能会超出后端系统的处理能力。使用消息队列可以将请求暂存,逐步处理,避免系统过载。
- 优点:平滑流量,防止系统崩溃。
示例图:
(5)、事件驱动架构
- 应用场景:在事件驱动架构中,系统中的各个组件通过事件(消息)进行通信。例如,用户注册后触发一系列事件(如发送欢迎邮件、创建用户档案、记录日志等)。
- 优点:提高系统的灵活性和响应速度。
示例图:
解释:
第一个生产者推送消息到Topic1中。
消费者1消费Topic1中的消息,同时在作为生产者2将结果消息推送到Topic2中。
第二个消费者在消费Topic2中的消息。
简单说:
事件驱动就是同一个消息通过不同主题的发送和消费,实现各个组件在不同阶段对该任务做出正确的处理。
3、消息队列的两种模式
消息队列的主要功能是通过“消息”来解耦生产者和消费者,使得它们可以异步通信。根据消息传递的方式不同,消息队列通常支持两种主要的模式:点对点(Point-to-Point,P2P)模式和发布/订阅(Publish/Subscribe,Pub/Sub)模式。这两种模式在消息传递的行为、适用场景和系统设计上有着显著的区别。
(1)、点对点(Point-to-Point,P2P)模式
点对点模式是一种消息传递方式,其中每个消息只能被一个消费者消费,并且消息一旦被消费,就会从队列中移除。生产者将消息发送到队列中,消费者从队列中取出消息并处理。每个队列可以有多个消费者,但每个消息只能被其中一个消费者消费。
1、P2P工作原理
- 生产者:生产者将消息发送到一个特定的队列中。
- 队列:队列是一个存储消息的容器,消息按顺序排列,遵循先进先出(FIFO)的原则。
- 消费者:消费者从队列中取出消息并处理。每个消息只能被一个消费者消费,消费后该消息会从队列中移除。
2、P2P特点
- 单次消费:每个消息只能被一个消费者消费,确保消息不会被重复处理。
- 负载均衡:如果队列中有多个消费者,消息会自动分发给不同的消费者,实现负载均衡。每个消费者处理的消息数量取决于其处理速度。
- 消息持久化:消息在被消费之前会一直保存在队列中,即使消费者暂时不可用,消息也不会丢失。
- 可靠性:通常支持消息确认机制,消费者处理完消息后需要向队列发送确认,确保消息不会因为消费者故障而丢失。
3、P2P适用场景
- 任务分配:适用于需要将任务分配给多个工作者的场景。例如,订单处理系统中,多个订单处理服务可以从同一个队列中获取订单进行处理。
- 异步任务处理:适用于需要异步执行的任务,如发送邮件、生成报表等。生产者将任务放入队列,消费者在后台逐步处理。
- 负载均衡:适用于需要将请求分发给多个处理节点的场景,避免某个节点过载。
4、P2P消息队列
- RabbitMQ的队列模式:RabbitMQ支持P2P模式,生产者将消息发送到队列中,消费者从队列中取出消息并处理。每个消息只能被一个消费者消费。
- ActiveMQ的点对点队列:ActiveMQ也支持P2P模式,生产者将消息发送到队列中,消费者从队列中取出消息并处理。
5、RabbitMQ代码示例(P2P)
(1)、注入依赖
<!-- RabbitMQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)、生产者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class P2PProducer {private static final String QUEUE_NAME = "p2p-queue"; // 队列名称public static void main(String[] args) throws Exception {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 2、建立连接try (Connection connection = factory.newConnection();// 3、声明队列(如果队列不存在则创建)Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4、发送多条消息到队列for (int i = 0; i < 10; i++) {String message = "Message " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}
解释:
- ConnectionFactory用于创建连接,并通过Channel发送消息到指定的队列p2p-queue。
- channel.queueDeclare()用于声明队列,确保队列存在。如果队列已经存在,则不会重复创建。
- channel.basicPublish()将消息发送到队列中,""表示不使用交换机(Exchange),直接将消息发送到队列。
(3)、消费者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class P2PConsumer {private static final String QUEUE_NAME = "p2p-queue"; // 队列名称public static void main(String[] args) throws Exception {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 2、建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3、声明队列(确保队列存在)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4、声明消息处理回调方法。这里仅是声明回调方法,不会直接运行,需要basicConsume方法消费完成后触发回调方法,才会被执行DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// message为消息,这里写消费消息的具体逻辑try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println(" [x] Done");// 手动确认消息已处理channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 5、开始消费消息,basicConsume为处理消费入口,本例方法体内不做任何处理,直接 触发调用deliverCallback 回调方法System.out.println(" [] Waiting for messages. To exit press CTRL+C");channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}
解释:
- 消费者同样使用ConnectionFactory创建连接,并通过Channel订阅队列p2p-queue。
- channel.queueDeclare()确保队列存在。
- channel.basicConsume()开始消费消息,false参数表示手动确认消息已处理。
- DeliverCallback是一个回调函数,当消息被basicConsume()消费后会触发该回调。消费者处理完消息后,调用channel.basicAck()确认消息已处理,防止消息丢失。
- 如果消费者暂时不可用,消息会继续保存在队列中,直到消费者重新连接并处理。
说明:
在RabbitMQ中,队列是实现P2P模式的关键。每个消息只会被一个消费者消费,即使有多个消费者订阅了同一个队列,RabbitMQ会自动将消息分发给不同的消费者,实现负载均衡。
如果你启动多个P2PConsumer实例,它们会自动分担消息的处理任务,确保每个消息只被一个消费者处理。
6、kafka代码示例
(1)、注入依赖
<!-- Kafka 依赖 -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
(2)、生产者示例
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class P2PProducer {public static void main(String[] args) {// 1、配置生产者Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2、创建生产者try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {// 3、拼装消息(指定主题,消息的key以及消息的value)for (int i = 0; i < 10; i++) {String key = "key-" + i;String value = "message-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("p2p-topic", key, value);// 4、异步发送消息,并添加回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent to partition: " + metadata.partition() +", offset: " + metadata.offset());} else {System.err.println("Error sending message: " + exception.getMessage());}}});}// 5、确保所有消息都已发送完毕producer.flush();}}
}
解释:
- 使用KafkaProducer发送消息到p2p-topic主题。
- 每条消息都有一个键(key)和值(value),并使用StringSerializer进行序列化。
- 通过producer.send()方法异步发送消息,并提供回调函数来处理发送结果。
- producer.flush()确保所有消息都已发送完毕。
(3)、消费者示例
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class P2PConsumer {public static void main(String[] args) {// 1、配置消费者Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "p2p-group"); // 消费者组IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费// 2、创建消费者try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 3、订阅主题consumer.subscribe(Collections.singletonList("p2p-topic"));// 4、持续接收消费消息while (true) {// 拉取消息,等待最多1秒ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));// 5、处理拉取到的消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key = %s, value = %s, partition = %d, offset = %d%n",record.key(), record.value(), record.partition(), record.offset());}// 6、提交偏移量consumer.commitSync();}}}
}
解释:
- 使用KafkaConsumer订阅p2p-topic主题。
- 消费者组ID为p2p-group,确保同一组内的多个消费者可以自动进行负载均衡。
- consumer.poll()方法用于拉取消息,每次最多等待1秒。
- 消费者处理完消息后,调用consumer.commitSync()提交偏移量,确保消息不会被重复消费。
说明:
在Kafka中,消费者组是实现P2P模式的关键。当多个消费者属于同一个消费者组时,Kafka会自动将消息分配给不同的消费者,确保每个消息只会被一个消费者消费。
如果你启动多个P2PConsumer实例,它们会自动分担消息的处理任务,实现负载均衡。
(2)、发布/订阅(Publish/Subscribe,Pub/Sub)模式
发布/订阅模式是一种消息传递方式,其中每个消息可以被多个消费者消费。生产者将消息发布到一个主题(Topic),所有订阅了该主题的消费者都可以接收到该消息。每个消费者都可以独立地消费消息,互不干扰。
1、工作原理
- 生产者:生产者将消息发布到一个主题(Topic)中,而不是直接发送到队列。
- 主题:主题是一个逻辑上的消息通道,生产者将消息发布到主题中,消费者订阅该主题以接收消息。
- 消费者:消费者订阅主题后,每当有新消息发布到该主题时,所有订阅了该主题的消费者都会收到一份副本。每个消费者可以独立地处理消息,互不干扰。
2、特点
- 多消费者:每个消息可以被多个消费者消费,适合广播消息或通知类场景。
- 松耦合:生产者和消费者之间是完全解耦的,生产者不需要知道有多少消费者订阅了主题,消费者也不需要知道是谁发布了消息。
- 消息复制:每个订阅了主题的消费者都会收到一份消息副本,因此消息会被复制给所有消费者。
- 消息保留:通常支持消息保留策略,即使消费者暂时离线,消息也会保留一段时间,直到消费者重新连接并消费。
- 灵活性:消费者可以根据需要动态订阅或取消订阅主题,灵活应对不同的业务需求。
3、适用场景
- 事件通知:适用于需要将事件广播给多个接收者的场景。例如,用户注册后触发一系列事件(如发送欢迎邮件、创建用户档案、记录日志等),每个事件处理服务都可以订阅相应的主题。
- 日志收集:适用于需要将日志数据广播给多个分析系统的场景。例如,多个日志分析平台可以订阅同一个日志主题,各自处理不同的日志数据。
- 实时推送:适用于需要实时推送消息的场景,如股票行情更新、社交媒体通知等。每个用户可以订阅感兴趣的主题,实时接收相关消息。
4、适用的消息队列
- Kafka的主题模式:Kafka是典型的Pub/Sub模型,生产者将消息发布到主题中,消费者组中的每个消费者都可以订阅该主题并消费消息。Kafka还支持分区,允许多个消费者并行处理同一主题的不同分区。
- Pulsar的主题模式:Pulsar也支持Pub/Sub模式,生产者将消息发布到主题中,消费者可以订阅该主题并消费消息。Pulsar还支持层级存储,允许将冷数据存储到低成本存储介质中。
- RabbitMQ的交换机(Exchange)模式:RabbitMQ支持多种交换机类型,包括Fanout、Direct、Topic等,可以实现Pub/Sub模式。生产者将消息发送到交换机,交换机会根据路由规则将消息转发给多个队列,消费者可以从这些队列中消费消息。
5、RabbitMQ代码示例
(1)、注入依赖
<!-- RabbitMQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)、生产者示例
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PubSubProducer {private static final String EXCHANGE_NAME = "pubsub-exchange";public static void main(String[] args) throws Exception {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 2、建立连接try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3、声明交换机(类型为 Fanout,即广播)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 4、发送多条消息到交换机for (int i = 0; i < 10; i++) {String message = "Message " + i;channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}
解释:
- 使用ConnectionFactory创建连接,并通过Channel将消息发布到交换机pubsub-exchange。
- channel.exchangeDeclare()用于声明交换机,类型为Fanout。Fanout交换机会将消息广播给所有绑定到该交换机的队列。
- channel.basicPublish()将消息发布到交换机,""表示不指定路由键(Routing Key),因为Fanout交换机会将消息广播给所有绑定的队列。
(3)、消费者示例
import com.rabbitmq.client.*;public class PubSubConsumer {private static final String EXCHANGE_NAME = "pubsub-exchange";public static void main(String[] args) throws Exception {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 2、建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3、声明交换机(类型为 Fanout)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 4、声明临时队列(随机生成队列名称)String queueName = channel.queueDeclare().getQueue();System.out.println(" [] Queue name: " + queueName);// 5、将队列绑定到交换机channel.queueBind(queueName, EXCHANGE_NAME, "");// 6、声明消息处理回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 真实消费数据逻辑,message为报文数据try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println(" [x] Done");// 自动确认消息已处理channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 7、开始消费消息,处理完成会触发回调deliverCallback 方法System.out.println(" [] Waiting for messages. To exit press CTRL+C");channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});}
}
解释:
- 消费者使用ConnectionFactory创建连接,并通过Channel订阅交换机 pubsub-exchange。
- channel.exchangeDeclare()确保交换机存在。
- channel.queueDeclare()声明一个临时队列,RabbitMQ会为每个消费者生成一个唯一的队列名称。
- channel.queueBind()将临时队列绑定到交换机,确保该队列可以接收来自交换机的消息。
- channel.basicConsume()开始消费消息,false参数表示手动确认消息已处理。
- DeliverCallback是一个回调函数,当有新消息到达时会触发该回调。消费者处理完消息后,调用channel.basicAck()确认消息已处理。
说明:
- 在RabbitMQ中,交换机(Exchange)是实现Pub/Sub模式的关键。生产者将消息发布到交换机,所有绑定到该交换机的队列都会接收到消息副本。
- 每个消费者可以订阅不同的队列,或者多个消费者可以共享同一个队列。通过Fanout交换机,所有绑定到该交换机的队列都会收到相同的消息副本,实现广播效果。
- 如果你启动多个PubSubConsumer实例,每个实例都会创建一个独立的队列,并绑定到交换机。因此,每个消费者都会收到相同的消息副本,互不干扰。
6、kafka代码示例
(1)、注入依赖
<!-- Kafka 依赖 -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
(2)、生产者示例
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class PubSubProducer {public static void main(String[] args) {// 1、配置生产者Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2、创建生产者try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {// 3、组装发送消息到主题for (int i = 0; i < 10; i++) {String key = "key-" + i;String value = "message-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("pubsub-topic", key, value);// 4、异步发送消息,并添加回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("Message sent to partition: " + metadata.partition() +", offset: " + metadata.offset());} else {System.err.println("Error sending message: " + exception.getMessage());}}});}// 5、确保所有消息都已发送完毕producer.flush();}}
}
解释:
- 与P2P模式的生产者代码几乎相同,唯一的区别是消息发送到的主题名称不同(pubsub-topic)。
- 生产者将消息发布到pubsub-topic,所有订阅了该主题的消费者都可以接收到这些消息。
(3)、消费者示例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class PubSubConsumer {public static void main(String[] args) {// 1、配置消费者Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "pubsub-group-" + Thread.currentThread().getId()); // 每个消费者使用不同的组 IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费// 2、创建消费者try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 3、订阅主题consumer.subscribe(Collections.singletonList("pubsub-topic"));// 4、持续消费消息while (true) {// 拉取消息,等待最多 1 秒ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));// 5、处理拉取到的消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumer %d received message: key = %s, value = %s, partition = %d, offset = %d%n",Thread.currentThread().getId(), record.key(), record.value(), record.partition(), record.offset());}// 6、提交偏移量consumer.commitSync();}}}
}
解释:
- 每个消费者使用不同的消费者组ID(pubsub-group-<线程ID>),确保每个消费者都能独立地消费消息。
- consumer.subscribe()订阅pubsub-topic,所有订阅了该主题的消费者都会收到相同的消息副本。
- 消费者处理完消息后,调用consumer.commitSync()提交偏移量,确保消息不会被重复消费。
说明:
- 在Kafka中,每个消费者组是独立的,即使多个消费者订阅了同一个主题,它们也不会共享消息。每个消费者组中的消费者只会消费属于该组的消息。
- 通过为每个消费者分配不同的消费者组ID,我们可以实现Pub/Sub模式,即每个消息会被广播给所有订阅了该主题的消费者。
- 如果你启动多个PubSubConsumer实例,每个实例都会使用不同的消费者组ID,因此它们会各自独立地消费消息。每个消费者都会收到相同的消息副本,实现广播效果。
(3)、两种模式对比总结
-
点对点(P2P)模式:适用于需要确保每个消息只被处理一次的场景,例如任务分配、异步任务处理和负载均衡。它具有较高的可靠性和负载均衡能力,适合处理任务队列和批处理任务。
-
发布/订阅(Pub/Sub)模式:适用于需要将消息广播给多个接收者的场景,例如事件通知、日志收集和实时推送。它具有更高的灵活性和解耦性,适合构建事件驱动架构和实时数据流处理系统。
选择哪种模式取决于你的具体需求:
- 如果你需要确保每个消息只被处理一次,并且希望实现负载均衡,P2P模式是更好的选择。
- 如果你需要将消息广播给多个接收者,并且希望生产者和消费者之间完全解耦,Pub/Sub模式更加合适。
乘风破浪会有时,直挂云帆济沧海!!!
相关文章:
消息队列篇--基础篇(消息队列特点,应用场景、点对点和发布订阅工作模式,RabbmitMQ和Kafka代码示例等)
1、消息队列的介绍 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。 消息队列(Message Queue,简称MQ)…...
Jetpack架构组件学习——使用Glance实现桌面小组件
基本使用 1.添加依赖 添加Glance依赖: // For AppWidgets supportimplementation "androidx.glance:glance-appwidget:1.1.0"// For interop APIs with Material 3implementation "androidx.glance:glance-material3:1.1.0"// For interop APIs with Mater…...
go读取excel游戏配置
1.背景 游戏服务器,配置数据一般采用csv/excel来作为载体,这种方式,策划同学配置方便,服务器解析也方便。在jforgame框架里,我们使用以下的excel配置格式。 然后可以非常方便的进行数据检索,例如ÿ…...
Linux系统下速通stm32的clion开发环境配置
陆陆续续搞这个已经很久了。 因为自己新电脑是linux系统无法使用keil,一开始想使用vscode里的eide但感觉不太好用;后面想直接使用cudeide但又不想妥协,想趁着这个机会把linux上的其他单片机开发配置也搞明白;而且非常想搞懂cmake…...
快慢指针及原理证明(swift实现)
目录 链表快慢指针一、快慢指针基本介绍二、快慢指针之找特殊节点1.删除链表的倒数第k个结点题目描述解题思路 2.链表的中间节点题目描述解题思路 三、快慢指针之环形问题1.判断环形链表题目描述解题思路 2.判断环形链表并返回入环节点题目描述解题思路 3.变种——判断快乐数题…...
web前端3--css
注意(本文一切代码一律是在vscode中书写) 1、书写位置 1、行内样式 //<标签名 style"样式声明"> <p style"color: red;">666</p> 2、内嵌样式 1、style标签 里面写css代码 css与html之间分离 2、css属性:值…...
一文大白话讲清楚webpack基本使用——5——babel的配置和使用
文章目录 一文大白话讲清楚webpack基本使用——5——babel的配置和使用1. 建议按文章顺序从头看,一看到底,豁然开朗2. babel-loader的配置和使用2.1 针对ES6的babel-loader2.2 针对typescript的babel-loader2.3 babel配置文件 一文大白话讲清楚webpack基…...
Python自动化运维:一键掌控服务器的高效之道
《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 在互联网和云计算高速发展的今天,服务器数量的指数增长使得手动运维和管理变得异常繁琐。Python凭借其强大的可读性和丰富的生态系统,成为…...
基于quartz,刷新定时器的cron表达式
文章目录 前言基于quartz,刷新定时器的cron表达式1. 先看一下测试效果2. 实现代码 前言 如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。 而且听说点赞的人每天的运气都不会太差&…...
HTML常用属性
HTML标签的常见属性包括许多不同的功能,可以为元素提供附加信息或控制元素的行为。以下是一些常见的属性及其解释: 1. src 描述:src(source)属性指定一个资源的路径,通常用于图像、音频、视频等标签。常见…...
在 Babylon.js 中使用 Gizmo:交互式 3D 操作工具
在 3D 应用程序中,交互式操作对象(如平移、旋转、缩放)是一个常见的需求。Babylon.js 提供了一个强大的工具——Gizmo,用于在 3D 场景中实现这些功能。本文将介绍如何在 Babylon.js 中使用 Gizmo,并展示如何通过代码实…...
蓝桥杯练习日常|递归-进制转换
蓝桥云课760数的计算 一、递归 题目: 我的解题代码: #include <iostream> using namespace std; int sum0; int main() {// 请在此输入您的代码int n;cin>>n;int fun(int n);fun(n); cout<<sum<<\n;return 0; } // void fu…...
LabVIEW滤波器选择与参数设置
在信号处理应用中,滤波器是去除噪声、提取目标信号的重要工具。LabVIEW 提供多种类型的滤波器(如低通、高通、带通、带阻),用户需要根据采样频率、信号特性和应用需求合理选择滤波器类型及参数设置。本文以 采样率 100kHz…...
【c语言日寄】Vs调试——新手向
【作者主页】siy2333 【专栏介绍】⌈c语言日寄⌋:这是一个专注于C语言刷题的专栏,精选题目,搭配详细题解、拓展算法。从基础语法到复杂算法,题目涉及的知识点全面覆盖,助力你系统提升。无论你是初学者,还是…...
C#中的Timers.Timer使用用法及常见报错
System.Timers.Timer 是一个基于服务器的计时器,它可以在应用程序中定期触发事件。这个计时器特别适合用于多线程环境,并且不应该与用户界面(UI)直接交互。在 ASP.NET 中,通常使用 System.Timers.Timer 来处理周期性的任务。 主要使用步骤&am…...
chrome小插件:长图片等分切割
前置条件: 安装有chrome谷歌浏览器的电脑 使用步骤: 1.打开chrome扩展插件 2.点击管理扩展程序 3.加载已解压的扩展程序 4.选择对应文件夹 5.成功后会出现一个扩展小程序 6.点击对应小程序 7.选择图片进行切割,切割完成后会自动保存 代码…...
mysql数据被误删的恢复方案
文章目录 一、使用备份恢复二、使用二进制日志(Binary Log)三、使用InnoDB表空间恢复四、使用第三方工具预防措施 数据误删是一个严重的数据库管理问题,但通过合理的备份策略和使用适当的恢复工具,可以有效地减少数据丢失的风险…...
K8S-Pod资源清单的编写,资源的增删改查,镜像的下载策略
1. Pod资源清单的编写 1.1 Pod运行单个容器的资源清单 ##创建工作目录 mkdir -p /root/manifests/pods && cd /root/manifests/pods vim 01-nginx.yaml ##指定api版本 apiVersion: v1 ##指定资源类型 kind: Pod ##指定元数据 metadata:##指定名称name: myweb ##用户…...
Unity Line Renderer Component入门
Overview Line Renderer 组件是 Unity 中用于绘制连续线段的工具。它通过在三维空间中的两个或两个以上的点的数组,并在每个点之间绘制一条直线。可以绘制从简单的直线到复杂的螺旋线等各种图形。 1. 连续性和独立线条 连续性:Line Renderer 绘制的线条…...
计算机工程:解锁未来科技之门!
计算机工程与应用是一个充满无限可能性的领域。随着科技的迅猛发展,计算机技术已经深深渗透到我们生活的方方面面,从医疗、金融到教育,无一不在彰显着计算机工程的巨大魅力和潜力。 在医疗行业,计算机技术的应用尤为突出。比如&a…...
翻译:How do I reset my FPGA?
文章目录 背景翻译:How do I reset my FPGA?1、Understanding the flip-flop reset behavior2、Reset methodology3、Use appropriate resets to maximize utilization4、Many options5、About the author 背景 在写博客《复位信号的同步与释放(同步复…...
在Unity中使用大模型进行离线语音识别
文章目录 1、Vosk下载下载vosk-untiy-asr下载模型在项目中使用语音转文字音频转文字2、whisper下载下载unity项目下载模型在unity中使用1、Vosk 下载 下载vosk-untiy-asr Github链接:https://github.com/alphacep/vosk-unity-asr 进不去Github的可以用网盘 夸克网盘链接:h…...
SpringBoot+Vue使用Echarts
前言 在vue项目中使用echarts,本次演示是使用vue2 1 前端准备 echarts官网: https://echarts.apache.org/zh/index.html 官网提供了基本的使用说明和大量的图表 1.1 下载echarts 执行命令 npm install echarts 直接这样执行很可能会失败,…...
【QT】-explicit关键字
explicit explicit 是一个 C 关键字,用于修饰构造函数。它的作用是防止构造函数进行隐式转换。 为什么需要 explicit? 在没有 explicit 的情况下,构造函数可以用于隐式类型转换。这意味着,如果你有一个接受某种类型的参数的构造…...
docker: Device or resource busy
(base) [rootbddx-vr-gpu-bcc2 /]#rm -rf /ssd1/docker/overlay2/8d96a51e3fb78e434fcf2b085e952adcc82bfe37485d427e1e017361a277326d/ rm: cannot remove ‘/ssd1/docker/overlay2/8d96a51e3fb78e434fcf2b085e952adcc82bfe37485d427e1e017361a277326d/merged’: Device or re…...
Vue - toRefs() 和 toRef() 的使用
一、toRefs() 在 Vue 3 中,toRefs()可以将响应式对象的属性转换为可响应的 refs。主要用于在解构响应式对象时,保持属性的响应性。 1. 导入 toRefs 函数 import { toRefs } from vue;2. 将响应式对象的属性转换为 ref const state reactive({count: 0,message:…...
(2024,MLLM,Healthcare,综述)多模态学习是否已在医疗保健领域实现通用智能?
Has Multimodal Learning Delivered Universal Intelligence in Healthcare? A Comprehensive Survey 目录 0. 摘要 1. 简介 5. MLLM 5.1 模态编码器与跨模态适配器 5.1.1 图像编码器 (Image Encoder) 5.1.2 语言模型 (Language Model) 5.1.3 跨模态适配器 (Cross-moda…...
css命名规范——BEM
目录 引言 BEM是什么? 块Block 元素Element 修饰语Modifier BEM解决了哪些问题? 在流行框架的组件中使用 BEM 格式 实战 认识设计图 如何使用当前的css规范正确命名? 引言 css样式类命名难、太难了,难于上青天,这个和js变量命名还不一样。看看项目中五花八门的样…...
使用PHP函数 “is_object“ 检查变量是否为对象类型
在PHP中,变量可以保存不同类型的值,包括整数、字符串、数组、布尔值等等。其中,对象是一种特殊的数据类型,用于封装数据和方法。在处理PHP代码中,我们经常需要检查一个变量是否为对象类型,以便进行相应的处…...
Golang:使用DuckDB查询Parquet文件数据
本文介绍DuckDB查询Parquet文件的典型应用场景,掌握DuckDB会让你的产品分析能力更强,相反系统运营成本相对较低。为了示例完整,我也提供了如何使用Python导出MongoDB数据。 Apache Parquet文件格式在存储和传输大型数据集方面变得非常流行。最…...
Moretl FileSync增量文件采集工具
永久免费: <下载> <使用说明> 我们希望Moretl FileSync是一款通用性很好的文件日志采集工具,解决工厂环境下,通过共享目录采集文件,SMB协议存在的安全性,兼容性的问题. 同时,我们发现工厂设备日志一般为增量,为方便MES,QMS等后端系统直接使用数据,我们推出了增量采…...
消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)
Apache Pulusar是一个分布式、多租户、高性能的发布/订阅(Pub/Sub)消息系统,最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点,提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力,适用于大规模分布式系…...
linux 扩容
tmpfs tmpfs 82M 0 82M 0% /run/user/1002 tmpfs tmpfs 82M 0 82M 0% /run/user/0 [输入命令]# fdisk -lu Disk /dev/vda: 40 GiB, 42949672960 bytes, 83886080 sectors Units: sectors of 1 * 512 512 bytes Sector size (logi…...
数据表中的数据查询
文章目录 一、概述二、简单查询1.列出表中所有字段2.“*”符号表示所有字段3.查询指定字段数据4.DISTINCT查询 三、IN查询四、BETWEEN ADN查询1.符合范围的数据记录查询2.不符合范围的数据记录查询 五、LIKE模糊查询六、对查询结果排序七、简单分组查询1.统计数量2.统计计算平均…...
深入了解 Java split() 方法:分割字符串的利器
Java 提供的 split() 方法是 String 类中一个常用的工具,它可以将一个字符串根据指定的分隔符切割成多个子字符串,并以字符串数组的形式返回。这个方法常用于字符串的处理、数据解析等场景。本文将详细介绍 Java 中 split() 方法的使用方式,并…...
Ubuntu 安装 docker 配置环境及其常用命令
Docker 安装与配置指南 本文介绍如何在 Ubuntu 系统上安装 Docker,解决权限问题,配置 Docker Compose,代理端口转发,容器内部代理问题等并进行相关的优化设置。参考官方文档:Docker 官方安装指南 一、安装 Docker 1…...
Android Studio安装配置
一、注意事项 想做安卓app和开发板通信,踩了大坑,Android 开发不是下载了就能直接开发的,对于新手需要注意的如下: 1、Android Studio版本,根据自己的Android Studio版本对应决定了你所兼容的AGP(Android…...
leetcode 面试经典 150 题:有效的括号
链接有效的括号题序号20题型字符串解法栈难度简单熟练度✅✅✅ 题目 给定一个只包括 ‘(’,‘)’,‘{’,‘}’,‘[’,‘]’ 的字符串 s ,判断字符串是否有效。 有效字符串需满足: 左括号必须…...
C语言 指针_野指针 指针运算
野指针: 概念:野指针就是指针指向的位置是不可知的(随机的、不正确的、没有明确限制的) 指针非法访问: int main() {int* p;//p没有初始化,就意味着没有明确的指向//一个局部变量不初始化,放…...
【HarmonyOS之旅】基于ArkTS开发(二) -> UI开发之常见布局
目录 1 -> 自适应布局 1.1 -> 线性布局 1.1.1 -> 线性布局的排列 1.1.2 -> 自适应拉伸 1.1.3 -> 自适应缩放 1.1.4 -> 定位能力 1.1.5 -> 自适应延伸 1.2 -> 层叠布局 1.2.1 -> 对齐方式 1.2.2 -> Z序控制 1.3 -> 弹性布局 1.3.1…...
java基础学习——jdbc基础知识详细介绍
引言 数据的存储 我们在开发 java 程序时,数据都是存储在内存中的,属于临时存储,当程序停止或重启时,内存中的数据就会丢失,我们为了解决数据的长期存储问题,有以下解决方案: 通过 IO流书记&…...
第十五届蓝桥杯大赛软件赛省赛C/C++ 大学 B 组
第十五届的题目在规定时间内做出了前5道,还有2道找时间再磨一磨。现在把做的一些思路总结如下: 题1:握手问题 问题描述 小蓝组织了一场算法交流会议,总共有 50人参加了本次会议。在会议上,大家进行了握手交流。按照惯例…...
基于JAVA的微信点餐小程序设计与实现(LW+源码+讲解)
专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…...
2K320Hz显示器哪个好?
2K320Hz显示器哪个好?320Hz这种高刷新率的显示器确实很少见,那究竟哪个牌子哪个型号更适合你呢? 1.HKC G27H4Pro - 2K320Hz显示器哪个好 外观设计 - HKC G27H4Pro 2K320Hz显示器 三面微边框超震撼:采用三面微边框设计࿰…...
八股学习 微服务篇
微服务篇 常见面试内容Spring Cloud 常见组件注册中心Ribbon负载均衡策略服务雪崩 常见面试内容 Spring Cloud 常见组件 Spring Cloud有5个常见组件: Eureka/Nacos:注册中心;Ribbon:负载均衡;Feign:远程调用;Hystrix/Sentinel:服…...
C# 中 readonly 与 const 的使用
总目录 前言 在C#编程中,readonly 和 const 是两个用于定义不可变数据的关键字。它们都旨在创建那些一旦赋值后就不能再改变的字段或变量。尽管这两个关键字看起来相似,但它们有着不同的特性和适用场景。本文将深入探讨 readonly 和 const 的区别&#…...
Spring Boot Starter介绍
前言 大概10来年以前,当时springboot刚刚出现并没有流行,当时的Java开发者们开发Web应用主要是使用spring整合springmvc或者struts、iBatis、hibernate等开发框架来进行开发。项目里一般有许多xml文件配置,其中配置了很多项目中需要用到的Be…...
Kafak 单例生产者实现-C#操作
前面写了一篇入门操作的文章,因为工作需要,简单修改了下如何实现单例生产者。 Kafka入门-C#操作_c# kafka-CSDN博客文章浏览阅读1.6k次,点赞20次,收藏9次。2).报错:“kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state…...
软件开发学习路线——roadmap
推荐软件学习路线网站:https://roadmap.sh/get-started 有有关前端后端开发的学习路径,也有AI,移动开发,管理相关的学习路径 会有相应的词条路径,深入学习 右上角可以设置学习任务的完成情况...
移动端VR处理器和传统显卡的不同
骁龙 XR 系列芯片 更多地依赖 AI 技术 来优化渲染过程,而传统的 GPU 渲染 则倾向于在低画质下运行以减少负载。这种设计是为了在有限的硬件资源下(如移动端 XR 设备)实现高性能和低功耗的平衡。以下是具体的分析: 1. AI 驱动的渲染…...