详解RabbitMQ工作模式之工作队列模式
目录
工作队列模式
概念
特点
应用场景
工作原理
注意事项
代码案例
引入依赖
常量类
编写生产者代码
编写消费者1代码
编写消费者2代码
先运行生产者,后运行消费者
先运行消费者,后运行生产者
工作队列模式
概念
在工作队列模式中,一个生产者(producer)将任务发布到队列中,多个消费者(consumer)从队列中获取任务并执行。这种模式的主要目标是提高任务的并行处理能力,从而提高系统的吞吐量和效率。
特点
可以有多个消费者,但一条消息只能被一个消费者获取。
消费者在处理完某条消息后,才会收到下一条消息。
RabbitMQ采用轮询(Round-Robin)或公平分发(Fair Dispatch)的方式将消息发送给消费者。
应用场景
1.任务分发:将任务分发给多个工作者(消费者),以便并行处理。这对于需要高吞吐量和任务处理效率的应用程序非常有用。例如,图像处理、视频编码、数据转换等应用可以使用工作队列模式来并行处理大量任务。
2.负载均衡:当有多个消费者时,工作队列模式可以用来实现负载均衡。任务将均匀分布给可用的消费者,以确保每个消费者都有工作可做,而且不会超负荷。
3.后台任务处理:在Web应用程序中,后台任务处理是一个常见的需求。工作队列模式可用于处理与Web请求无关的长时间运行任务,而不会影响用户体验。例如,发送电子邮件、生成报告、备份数据等后台任务可以使用工作队列来处理。
工作原理
1.生产者发送任务:生产者将任务封装为消息,并将其发送到RabbitMQ队列中。
2.RabbitMQ分发任务:RabbitMQ根据配置的分发策略(如轮询或公平分发)将任务分发给消费者。
3.消费者处理任务:消费者从队列中获取任务并执行。在处理完任务后,消费者会向RabbitMQ发送确认消息,表示任务已完成。
4.RabbitMQ确认任务完成:在收到消费者的确认消息后,RabbitMQ会将该任务从队列中移除。
注意事项
1.消息确认:为了确保消息不会丢失,消费者在处理完任务后需要向RabbitMQ发送确认消息。如果消费者在处理任务时失败或崩溃,RabbitMQ会将该任务重新分发给其他消费者。
2.负载均衡:RabbitMQ默认采用轮询方式将消息分发给消费者。如果需要更复杂的负载均衡策略,可以考虑使用其他分发策略或自定义交换机类型。
3.错误处理:在生产者和消费者中都需要添加适当的错误处理逻辑,以处理可能出现的异常情况,如连接失败、消息发送失败等。
代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version> </dependency>
常量类
public class Constants {public static final String HOST = "47.98.109.138";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "aaa";//工作队列模式public static final String WORK_QUEUE = "work.queue";
}
编写生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功~");//6. 资源释放channel.close();connection.close();}
}
编写消费者1代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 资源释放
// channel.close();
// connection.close();}
}
编写消费者2代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODOSystem.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// //6. 资源释放
// channel.close();
// connection.close();}
}
先运行生产者,后运行消费者
查看管理界面
我们此时会看到,先启动的消费者会消费掉队列中所有的消息。
先运行消费者,后运行生产者
此时我们能看到,两个消费者都能够消费消息。
相关文章:
详解RabbitMQ工作模式之工作队列模式
目录 工作队列模式 概念 特点 应用场景 工作原理 注意事项 代码案例 引入依赖 常量类 编写生产者代码 编写消费者1代码 编写消费者2代码 先运行生产者,后运行消费者 先运行消费者,后运行生产者 工作队列模式 概念 在工作队列模式中&#x…...
QGIS+mcp的安装和使用
QGISmcp的安装和使用 安装qgis_mcp 下载qgis_mcp: git clone https://github.com/jjsantos01/qgis_mcp.git安装uv uv是一个由Rust语言编写的python包管理工具,旨在提供比传统工具(如 pip)更高效的依赖管理和虚拟环境操作。 p…...
Java基础361问第16问——枚举为什么导致空指针?
我们看一段代码 public enum Color {RED, BLUE, YELLOW;public static Color parse(String color) {return null;} }public static void main() {Color color Color.parse("");// 极具迷惑性,大家日常开发肯定这么写过switch (color) {case RED:break;c…...
在 C# .NET 中驾驭 JSON:使用 Newtonsoft.Json 进行解析与 POST 请求实战
JSON (JavaScript Object Notation) 已经成为现代 Web 应用和服务之间数据交换的通用语言。无论你是开发后端 API、与第三方服务集成,还是处理配置文件,都绕不开 JSON 的解析与生成。在 C# .NET 世界里,处理 JSON 有多种选择,其中…...
CentOS7——Docker部署java服务
1、安装Docker 首先要确保系统已安装 Docker,若未安装,可以参考我的另一篇文章现在CentOS7上安装Docker,文章地址如下: CentOS7系统安装Docker教程-CSDN博客 Docker当中要安装必备的软件,比如Java运行必要的JDK&#…...
Python-Part2-集合、字典与推导式
Python-Part2-集合、字典与推导式 1. set集合 ⽆序,去掉重复数据。 set1 {1,2,3,4,5,5,4,3,2,1}print(type(set1))print(set1)set2.add(66666)set2.remove(55)#不能使用下标访问set,所以修改操作一般为remove操作 add操作2.dict 字典 字典ÿ…...
《AI大模型应知应会100篇》第39篇:多模态大模型应用:文本、图像和音频的协同处理
第39篇:多模态大模型应用:文本、图像和音频的协同处理 摘要 随着人工智能技术的发展,多模态大模型(Multimodal Large Models)已经成为AI领域的热点之一。这些模型能够同时处理文本、图像、音频等多种模态数据…...
kvm学习小结
安装相关包 安装虚拟化相关包 apt install qemu-kvm qemu-system libvirt-clients libvirt-daemon-system vlan bridge-utils 安装界面相关包 apt install xinit gdmd 配置机器允许root登录 检查cpu是否支持虚拟化 egrep -o vmx|svm /proc/cpuinfo 执行命令systemctl s…...
k8s基本概念-YAML
YAML介绍 YAML是“YAML Aint a Markup Language” (YAML不是一种置标语言)的递归缩进写,早先YAML的意思其实是:“Yet Another Markup Language”(另一种置标语言) YAML是一个类似XML、JSON的标记性语言。YAML强调以数据为中心,并不是以标识语言为重点。因而YAML本身的定义…...
wps批注线条怎么取消去掉wps批注后有竖线
wps批注线条怎么取消去掉wps批注后有竖线 问题 图片 解决方案 图片 word批注线条取消的方法: 1.打开Word文档,点击需要删除的批注。 2.然后点击工具栏“审阅”选项。 3.接着点击“接受“ 4.接受对文档所做的所有修订(H)...
深度解析算法之分治(归并)
48.排序数组 题目链接 给你一个整数数组 nums,请你将该数组升序排列。 你必须在 不使用任何内置函数 的情况下解决问题,时间复杂度为 O(nlog(n)),并且空间复杂度尽可能小。 示例 1: 输入: nums [5,2,3,1] 输出&am…...
僵尸进程是什么?
僵尸进程(Zombie Process)是指在 Unix/Linux 系统中,一个子进程已经终止,但其父进程尚未对它进行善后处理(即没有读取其退出状态),导致子进程的进程表项仍然保留在系统中。由于这个进程已经结束…...
城市群出行需求的时空分形
城市群出行需求的时空分形 原文:He, Zhengbing. “Spatial-temporal fractal of urban agglomeration travel demand.” Physica A: Statistical Mechanics and its Applications 549 (2020): 124503. 1. Introduction(引言) 城市区域的重…...
LangChain入门(二)安装开发环境
1.安装conda Conda 是一个开源的软件包管理系统和环境管理系统,用于安装多个版本的软件包及其依赖关系,并在它们之间轻松切换。 Anaconda是一个开源的Python发行版本,其包含了conda、python等软件包,numpy、pandas、scipy等科学…...
如何开展有组织的AI素养教育?
一、AI素养的定义与核心内涵 AI素养是智能时代个体适应与创新能力的综合体现,其内涵随着技术发展动态扩展,包含以下核心维度: 知识体系:理解AI基本原理(如算法、数据、算力)、技术边界及发展趋势ÿ…...
InnoDB对LRU算法的优化
标准 LRU 算法的核心思想是:当缓存空间不足时,淘汰掉最近最少使用的数据块(Page)。它通常用一个链表来实现,链表头部是最近访问的 Page,链表尾部是最久未访问的 Page。 然而,在数据库系统中直接…...
云原生--核心组件-容器篇-7-Docker私有镜像仓库--Harbor
1、Harbor的定义与核心作用 定义: Harbor是由VMware开源的企业级容器镜像仓库系统,后捐赠给 CNCF (Cloud Native Computing Foundation)。它基于Docker Registry扩展了企业级功能,用于存储、分发和管理容器镜像(如Docker、OCI标准…...
TypeScript 实用类型深度解析:Partial、Pick、Record 的妙用
需求背景:在后台系统的用户管理模块中,我们常遇到这样的场景:修改用户资料时只需要传部分字段,展示用户列表时要隐藏敏感信息,快速查找用户需要ID索引等等,这些业务需求都可以通过 TypeScript 的实用类型优…...
【Pandas】pandas DataFrame rmod
Pandas2.2 DataFrame Binary operator functions 方法描述DataFrame.add(other)用于执行 DataFrame 与另一个对象(如 DataFrame、Series 或标量)的逐元素加法操作DataFrame.add(other[, axis, level, fill_value])用于执行 DataFrame 与另一个对象&…...
如何搭建spark yarn 模式的集群集群
以下是搭建Spark YARN模式集群的一般步骤: 准备工作 - 确保集群中各节点安装了Java环境,并配置好 JAVA_HOME 环境变量。 - 各节点间能通过SSH免密登录。 - 安装并配置好Hadoop集群,YARN作为Hadoop的资源管理器,Spark YARN模式需要…...
云原生--核心组件-容器篇-6-Docker核心之-镜像仓库(公共仓库,私有仓库,第三方仓库)
1、Docker仓库的定义与核心作用 定义: Docker仓库(Docker Registry)是用于存储、分发和管理Docker镜像的集中式存储库。它类似于代码仓库,但专门用于容器镜像的版本控制和共享。它允许开发人员和IT团队高效地管理、部署和分享容器…...
mysql8.0版本部署+日志清理+rsync备份策略
mysql安装:https://blog.csdn.net/qq_39399966/article/details/120205461 系统:centos7.9 数据库版本:mysql8.0.28 1.卸载旧的mysql,保证环境纯净 rpm -qa | grep mariadb mariadb-5.... rpm -e --nodeps 软件 rpm -e --nodeps mariadb-5.…...
搭建spark yarn 模式的集群集群
一.引言 在大数据处理领域,Apache Spark 是一个强大的分布式计算框架,而 YARN(Yet Another Resource Negotiator)是 Hadoop 的资源管理系统。将 Spark 运行在 YARN 模式下,可以充分利用 YARN 强大的资源管理和调度能力…...
在uni-app中使用Painter生成小程序海报
在uni-app中使用Painter生成小程序海报 安装Painter 从GitHub下载Painter组件:https://github.com/Kujiale-Mobile/Painter 将painter文件夹复制到uni-app项目的components目录下 配置页面 在需要使用海报的页面的pages.json中配置 {"path": "pag…...
Uni-app网络请求AES加密解密实现
Uni-app 网络请求封装与 AES 加密解密实现 下面我将为你提供一个完整的 Uni-app 网络请求封装方案,包含 POST 请求的统一处理、请求参数和响应数据的 AES 加密解密。 1. 创建加密解密工具类 首先创建一个 crypto.js 文件用于处理 AES 加密解密: // u…...
uniapp实现统一添加后端请求Header方法
uniapp把请求写完了,发现需要给接口请求添加头部,每个接口去添加又很麻烦,uniapp可以统一添加,并且还能给某些接口设置不添加头部。 一般用于添加token登录验证信息。 在 main.js 文件中配置。 代码如下: // 在…...
uniapp打包apk如何实现版本更新
我们做的比较简单,在后端设置版本号,并在uniapp的config.js中定义版本号,每次跟后端的进行对比,不一致的话就更新。 一、下载apk 主要代码(下载安装包,并进行安装,一般得手动同意安装…...
【Java开发日记】OpenFeign 的 9 个坑
目录 坑一:用对Http Client 1.1 feign中http client 1.2 ribbon中的Http Client 坑二:全局超时时间 坑三:单服务设置超时时间 坑四:熔断超时时间 4.1 使用feign超时 4.2 使用ribbon超时 4.3 使用自定义Options 坑五&…...
RocketMQ 存储核心:深入解析 CommitLog 设计原理
一、引言 在分布式消息队列系统中,消息存储的可靠性和高吞吐能力是衡量系统优劣的核心指标。Apache RocketMQ 作为一款高性能、高可用的分布式消息中间件,其独特的 CommitLog 存储机制在消息持久化过程中扮演了关键角色。本文将深入剖析 CommitLog 的设…...
【C++ Qt】快速上手 显⽰类控件(Label、LCDNumber、ProcessBar、CalendarWidget)
每日激励:“不设限和自我肯定的心态:I can do all things。 — Stephen Curry” 绪论: 本文围绕Qt中常用的显示类控件展开,重点讲解了 QLabel(文本/图片显示)、QLCDNumber(数字显示࿰…...
Docker和K8s面试题
1.Docker底层依托于linux怎么实现资源隔离的? 基于Namespace的视图隔离:Docker利用Linux命名空间(Namespace)来实现不同容器之间的隔离。每个容器都运行在自己的一组命名空间中、包括PID(进程)、网络、挂载…...
shell--数组、正则表达式RE
1.数组 1.1定义 什么是数组? 数组也是一种变量,常规变量只能保存一个值,数组可以保存多个值 1.2 分类 普通数组:只能用整数作为数组的索引--0 下标 有序数组(普通数组):(index)索引(为整数,从0开始) 关联数组:可以使用字符串作为数组的索引 1.3 普通数组 引用: ec…...
java 使用 POI 为 word 文档自动生成书签
poi 版本:4.1.0 <properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><…...
redis+lua+固定窗口实现分布式限流
用key的过期时间替代固定窗口的时间戳 -- KEYS[1]: 限流的key -- ARGV[1]: 限流窗口大小(秒) -- ARGV[2]: 限流阈值local key KEYS[1] local window tonumber(ARGV[1]) local limit tonumber(ARGV[2])-- 尝试获取当前计数 local current redis.call…...
什么是SQL92标准,有什么特点和影响?
一、SQL92简介 SQL92标准是1992年由美国国家标准协会(ANSI)和国际标准化组织(ISO)联合制定的数据库语言标准,正式名称为"SQL:1992"或ISO/IEC 9075:1992。他是关系型数据库管理系统(R…...
Flink Checkpoint 与实时任务高可用保障机制实战
在实时数仓体系中,数据一致性和任务稳定性是核心保障。本文围绕 Flink Checkpoint 机制,深入讲解高可用保障的最佳实践和工程实现。 一、业务背景与痛点 在金融风控、营销实时推荐、智能监控等场景中,实时数仓的每一条数据都至关重要。常见的业务痛点包括: 断点恢复困难:…...
WebRtc08:WebRtc信令服务器实现
如何使用socket.io发送消息 发送消息 // 给本次连接发送消息 socket.emit()// 给某个房间内所有人发送消息 io.in(room).emit()// 除了自己以外,给某个房间的所有人发消息 socket.to(room).emit();// 除本连接外,给所有人发消息 socket.broadcast.emit…...
基于 SpringBoot 与 Redis 的缓存预热案例
文章目录 “缓存预热” 是什么?项目环境搭建创建数据访问层预热数据到 Redis 中创建缓存服务类测试缓存预热 “缓存预热” 是什么? 缓存预热是一种优化策略,在系统启动或者流量高峰来临之前,将一些经常访问的数据提前加载到缓存中…...
Python对比两张CAD图并标记差异的解决方案
以下是使用Python对比两张CAD图并标记差异的解决方案,结合图像处理和CAD结构分析: 一、环境准备与库选择 图像处理库:使用OpenCV进行图像差异检测、颜色空间转换和轮廓分析。CAD解析库:若为DXF格式,使用ezdxf解析实体…...
LINUX427 冒险位 粘滞位 chmod 权限
为什么不同用户能查看的文件夹不同 思索 是因为不同文件夹的权限不同吗?感觉不是 权限不就是只有rwx权限吗? o 对对对 和0GU有关 O 组内的其他用户应该 O是其他用户 不是组内用户 文件创建应该设置了r权限 但是root为什么看到的好像不一样 root 这些…...
10 DPSK原始对话记录
10 DPSK原始对话记录 前言 编程之余,在 Vscode 的 Cline 插件界面中和 ai (dpsk v3-0324) 聊起了天,得到了一个有意思的回答。就像ai有自我意识一样。在此记录。 实际对话内容 时间范围:2025-04-27 23:37:22 - 23:44:17 对话模式:PLAN MODE [23:37:22] 用户提问 “你…...
实现一个瀑布流布局
1、纯CSS实现 实现方式:借助column-count属性来创建 4 列的布局,并使用 column-gap 设置列间距。每个 .img-container 使用 break-inside: avoid 来防止图片被分割。 来看一下完整的代码: <!DOCTYPE html> <html lang"en&qu…...
Linux:进程间通信->共享内存
1. 共享内存的概念 System V共享内存,是一个高效的进程间通信IPC机制,允许多个进程共享同一块物理内存区实现快速的数据交换。如下图所示 这两个进程分别通过页表映射到这一块共享内存中 2. 共享内存的函数 shmget 功能: 创建新的共享内存…...
《Crawl4AI 爬虫工具部署配置全攻略》
《Crawl4AI 爬虫工具部署配置全攻略》 摘要 :在数据驱动的智能时代,高效爬虫工具是获取信息的关键。本文将为你详细解析 Crawl4AI 的安装配置全流程,从基础设置到进阶优化,再到生产环境部署,结合实用技巧与常见问题解…...
spring框架学习(下)
这章节讲的主要是spring在生产Bean对象时的过程 Spring实例化对象的基本流程 1、解析bean.xml 2、封装成BeanDifinition类 3、存放到BeanDIfinitionMap里 4、从BeanDIfinitionMap遍历得到bean 5、将bean存放到SingletonObjects 6、调用getBean方法得到bean 以下是简易的…...
进程与线程-----C语言经典题目(8)
一.什么是进程 定义: 进程指的是程序在操作系统内的一次执行过程。它不只是程序代码,还涵盖了程序运行时的各类资源与状态信息。包括创建、调度、消亡。 进程的状态(ps -aux): 就绪状态:进程已经…...
lstm用电量预测+网页可视化大屏
视频教学: 训练结果: 详细代码: import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import train_test_split from tensorflow.keras.models import Sequential from tensorflow.keras.layers impo…...
linux blueZ 第六篇:嵌入式与工业级应用案例——在 Raspberry Pi、Yocto 与 Buildroot 上裁剪 BlueZ 并落地实战
本篇面向嵌入式与工业级应用场景,深入讲解如何在各类 Linux 构建系统(Raspberry Pi OS、Yocto、Buildroot)中裁剪、交叉编译与集成 BlueZ,以及在工业网关、资产追踪与蓝牙 Mesh 等典型方案中的落地实例与注意要点,帮助你打造稳定、可维护、低功耗的嵌入式蓝牙产品。 目录 …...
Dev控件RadioGroup 如何设置一排有N个显示或分为几行
1.5个选项 全部横排显示,则Columns 5 2.5个选项 每行分两个,则有三行,则Columns 2...
AOSP Android14 Launcher3——Launcher的状态介绍LauncherState类
Launcher3中有一个跟Launcher状态相关的类,叫LauncherState LauncherState 是 Launcher3 中定义各种用户界面状态的抽象基类。你可以把它想象成一个状态机,定义了 Launcher 可能处于的不同视觉和交互模式,例如主屏幕、所有应用列表、最近任务…...