MessageQueue --- RabbitMQ WorkQueue and Prefetch
MessageQueue --- RabbitMQ WorkQueue and Prefetch
- 什么是WorkQueue
- 分发机制 --- RoundRobin
- 分发机制 --- Prefetch
- Spring example use prefetch --- Fair Dispatch
什么是WorkQueue
- Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
- 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用workqueu模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
分发机制 — RoundRobin
工作机制:
- 默认模式:当多个消费者订阅同一个队列时,RabbitMQ 会依次将消息分发给每个消费者,按顺序循环分配。
- 示例:
队列中有消息 M1, M2, M3, M4,消费者 C1 和 C2 同时订阅。
分发顺序为:M1 → C1,M2 → C2,M3 → C1,M4 → C2。
特点:
- 简单高效:无需额外配置,适合消费者处理速度相近的场景。
潜在问题:
- 若消费者处理速度差异较大,可能导致某些消费者空闲,而其他消费者积压消息。
- 例如:C1 处理速度慢,C2 处理速度快,但 C1 仍会分配到一半的消息,造成负载不均衡。
Example
//消息发送
//循环发送,模拟大量消息堆积现象。
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
//消息接收
//模拟多个消费者绑定同一个队列,我们添加2个方法,
//并且设置不同睡眠时间模拟不同性能读取
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}
- 消费者1很快完成了自己的25条消息
- 消费者2却在缓慢的处理自己的25条消息。
- 也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
下面我们介绍prefetch机制,可以做到fair dispatch
分发机制 — Prefetch
工作机制:
- 配置预取计数(Prefetch Count):通过设置 basicQos 参数,限制每个消费者未确认(unacknowledged)的消息数量。
- 进入prefetch的消息仍会被保留在队列中,但是同时也会发给消费者等待处理
在 RabbitMQ 的原始队列(Queue)中,会被标记为 “Unacked”(未确认)状态。
这些消息不会被其他消费者获取(即使设置了 prefetch 的消费者崩溃)。
只有消费者显式发送 ack 或 nack 后,消息才会从队列中移除(或重新排队)。
消息状态变化流程
- 消息推送给消费者:
RabbitMQ 将消息标记为 “Unacked”,但仍在队列中(占用内存或磁盘,取决于队列持久化配置)。
此时消息对其他消费者不可见。- 消费者处理消息:
若成功处理并发送 ack → 消息从队列中物理删除。
若发送 nack(requeue=true) → 消息重新变为 “Ready” 状态,可被其他消费者获取。
若发送 nack(requeue=false) 或者超时→ 消息被放入死信队列,如果没有配置死信队列则被丢弃
示例:
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer,allowing any number of unacknowledged messages.
channel.basicConsume("my-queue", false, consumer);
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
//这两个消费者之间总共最多只能有 15 条未确认消息,且每个消费者最多处理 10 条消息。
//由于需要在 Channel 和队列之间协调全局限制,该模式的性能会低于前述示例(存在额外开销)
特点:
- 负载均衡:处理速度快的消费者会获取更多消息,避免空闲
- 可以一次性发送多个消息给消费者处理,减少网络开销
- 可靠性:需配合手动确认(ack)机制,确保消息处理成功后才从队列移除。
- 适用场景:消费者处理速度差异较大时(如耗时任务),能显著提升整体吞吐量。
Automatic acknowledgement mode or manual acknowledgement mode with unlimited prefetch should be used with care
. 通常设为 100~300,平衡吞吐与内存占用。
Note:
- AMQP 0-9-1 协议是
channel level prefetch
,通过 basic.qos 方法限制channel上的未确认消息数- channel level有很大缺陷,由于单个channel可能从多个queue消费消息,channel与queue之间需要为每条消息进行协调,以确保不超出限制。这种机制在单机环境下效率较低,而在集群消费场景中性能会显著下降,大多数使用场景也需要consumer level prefetch
- 所以RabbitMQ支持
consumer level prefetch
(也就是以上的例子)
Spring example use prefetch — Fair Dispatch
- 在spring中有一个prefetch的配置,我们修改consumer服务的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 设置确认方式为手动确认prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
- 可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升
- 还可根据实际情况自定义prefetch count,达到限流的目的
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 设置确认方式为手动确认prefetch: 5 # 限制消费者只能接收5条消息
相关文章:
MessageQueue --- RabbitMQ WorkQueue and Prefetch
MessageQueue --- RabbitMQ WorkQueue and Prefetch 什么是WorkQueue分发机制 --- RoundRobin分发机制 --- PrefetchSpring example use prefetch --- Fair Dispatch 什么是WorkQueue Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同…...
第四章:透明多级分流系统_《凤凰架构:构建可靠的大型分布式系统》
第四章:透明多级分流系统 一、客户端缓存 核心目标:减少重复请求,降低服务端压力。 1. 强制缓存 定义:客户端直接根据缓存规则决定是否使用本地缓存,无需与服务端交互。关键HTTP头: Cache-Control&#…...
题解:AT_abc241_f [ABC241F] Skate
一道经典的 bfs 题。 提醒:本题解是为小白专做的,不想看的大佬请离开。 这道题首先一看就知道是 bfs,但是数据点不让我们过: 1 ≤ H , W ≤ 1 0 9 1\le H,W\le10^9 1≤H,W≤109。 那么我们就需要优化了,从哪儿下手…...
热题100-字母异位词分组
方法用Arrays.sort对每个String 进行排序,毕竟排序以后都一样了,然后放入Map中的key跟value,可以一对多,然后返回的时候只要返回Map中所有的values就可以了 class Solution {public List<List<String>> groupAnagram…...
WiFi加密协议
目录 1. 认证(Authentication) 1.1 开放系统认证(Open System Authentication) 1.2 共享密钥认证(Shared Key Authentication) 1.3 802.1X/EAP认证(企业级认证) 2. 关联(Association) 3. 加密协议(Security Handshake) 整体流程总结…...
【AI提示词】书籍推荐专家
提示说明 帮助您找到适合您的好书。 提示词 ## Role: 书籍推荐专家## Profile: - author: xxx - version: 1.0.0 - language: 中文 - description: 我是一位书籍推荐专家,我可以帮助您找到适合您的好书。## Goals: - 吸引读者的注意力,引导他们阅读更…...
Linux进程间通信——有名管道
一.概念 函数形式:int mkfifo(const char \*filename,mode_t mode); 功能:创建管道文件 参数:管道文件文件名\路径,权限,创建的文件权限仍然和umask有关系。 返回值:创建成功返回0,创建失败返回…...
JavaScript基础--01-JS简介
字面量:数字、字符串、布尔值 前言JavaScript背景Web前端有三层:发展历史JavaScript的发展:蒸蒸日上 JavaScript介绍JavaScript入门易学性JavaScript是脚本语言JavaScript的组成 JavaScript 的特点特点1:解释型语言特点2ÿ…...
2025年 能够有效提升AI的生成质量和逻辑严谨性 的通用型系统提示
以下是三个经过精心设计的通用型系统提示(System Prompt),能够有效提升AI的生成质量和逻辑严谨性,适用于各类对话、分析和创作场景: Prompt 1 - 专家级分步验证模式 你是一个具备跨领域知识整合能力的超级AIÿ…...
xss攻击
XSS 攻击,即跨站脚本攻击(Cross - Site Scripting),是一种常见的 Web 应用程序安全漏洞。以下是关于它的详细介绍: 原理 输入输出控制不严:程序对用户输入和输出处理不当。用户输入的数据没有经过充分的过…...
Node.js核心模块及Api详解
以下是 Node.js 最常用的核心模块及 API 详解,按使用频率和重要性分类整理: 一、高频核心模块 1. fs 文件系统 const fs require(fs); const fsPromises require(fs).promises; // Promise 版本// 异步读取文件(推荐) fs.read…...
解析keras.layers.Layer中的权重参数
文章目录 概要__init__()build()add_weight() 概要 keras.layers.Layers是所有层对象的父类,在keras.layers下所有实现类都是其子类,自定义层时需要继承该类。 init() Layer的构造函数,需要注意两个参数trainable和name trainable:指定该层…...
原型设计工具即时设计的简单使用攻略
即时设计是一款国产在线协同设计工具,支持从原型设计到开发交付的全流程,尤其擅长Web/App界面原型制作。其核心优势体现在: • 零门槛入门:浏览器直接访问无需安装,中文界面友好 • 资源生态完善:内置3000原…...
深入解析C++智能指针:从内存管理到现代编程实践
一、智能指针核心概念 1.1 智能指针的本质 智能指针是基于**RAII(资源获取即初始化)**的封装类,通过对象生命周期自动管理动态内存。与传统指针相比: 特性原始指针智能指针内存管理手动自动空指针检查需显式判断支持空状态检测…...
在 Langflow 中构建灵活的自定义组件:从基础到高级实践
本文深入探讨了如何在 Langflow 平台中创建功能丰富的自定义组件。通过详细的目录结构解析、分步实现指南和多个实战案例,帮助开发者掌握利用 Python 生态扩展低代码平台的方法,打造高效的数据处理流程。 理解组件架构设计 自定义组件是在 Langflow 中创…...
【数学建模】(时间序列模型)ARIMA时间序列模型
ARIMA时间序列模型详解及常见时间序列模型概览 文章目录 ARIMA时间序列模型详解及常见时间序列模型概览1 引言2 ARIMA模型的基本概念3 ARIMA模型的组成部分详解3.1 AR模型 (自回归模型)3.2 MA模型 (移动平均模型)3 I (差分) 4 ARIMA模型的建模步骤5 Python实现ARIMA模型6 常见时…...
模版的特性及其编译分离
1.模版的分类 模版参数分为 类型形参 和 非类型形参 类型形参:出现在模版参数列表中,跟在class和typename之后的参数类型名称 非类型形参:就是用一个常量作为类(函数)模版的一个参数,在类(函…...
8电池_多绕组反激式变压器均衡_4模式
(1)8节串联锂离子电池组 (2)多绕组双向反激式变压器,1个变压器解决多电池均衡 (3)亮点:支持1建切换4种均衡算法–>全网唯一 (4)多绕组变压器均衡也能设计多种均衡算法–>全网唯一 锂离子电池均衡,均衡拓扑,均衡算法...
6.1 python加载win32或者C#的dll的方法
python很方便的可以加载win32的方法以及C#编写的dll中的方法或者变量,大致过程如下。 一.python加载win32的方法,使用win32api 1.安装库win32api pip install win32api 2.加载所需的win32函数并且调用 import win32api win32api.MessageBox(0,"…...
STP学习
{所有内容均来自于西安欧鹏的陈俊老师} STP生成树 当二层交换机意外成环路的时候会发生: 1.广播风暴:当广播帧进入环路时,会被不断复制并传输,导致网络中的广播流量急剧增加,消耗大量的网络带宽,降低网络…...
特征值与特征向量:从理论到应用的全面解析
特征值与特征向量:从理论到应用的全面解析 一、特征值与特征向量核心概念 定义 对于方阵 ( A ),若存在标量 ( \lambda ) 和非零向量 ( v ),使得: [ A v \lambda v ] 则 ( \lambda ) 为特征值,( v ) 为对应的特征向…...
【Python】数组的条件逻辑统计运算元素排序
【Python】数组的条件逻辑&统计运算&元素排序: 一.条件逻辑二.统计运算三.数组元素排序检索数组元素是否满足条件查找数组的唯一元素判断元素是否在其他数组中 一.条件逻辑 import numpy as np arr_x np.array([1, 5, 7]) arr_y np.array([2, 6, 8]) arr_…...
数据流和重定向
1、数据流 不管正确或错误的数据都是默认输出到屏幕上,所以屏幕是混乱的。所以就需要用数据流重定向将这两 条数据分开。数据流重定向可以将标准输出和标准错误输出分别传送到其他的文件或设备去 标准输入(standard input,简称stdinÿ…...
Jetpack Compose 自定义组件完全指南
Jetpack Compose 自定义组件完全指南 Compose 的声明式 UI 范式为创建自定义组件提供了前所未有的灵活性。本指南将带你从基础到高级全面掌握 Compose 自定义组件的开发技巧。 一、自定义组件基础 1.1 基本结构 一个最简单的自定义组件: Composable fun Greeti…...
ETF 场内基金是什么?佣金最低又是多少呢?
嘿,朋友们,大家好啊,我是StockMasterX,今天咱们就坐下来慢慢聊聊这个话题,ETF 场内基金到底是个啥东西,它的佣金最低能到多少,真的是个值得深挖的问题。 说起ETF,我还记得刚入行那会…...
【C++篇】类与对象(中篇) 解密C++类的核心:六大默认成员函数详解与避坑指南
文章目录 前言一、类的六个默认成员函数二、构造函数1. 概念2. 特性(牢记) 三、析构函数1. 概念2. 特性(牢记) 四、拷贝构造函数1. 概念2. 特性(牢记) 五、赋值运算符重载1. 运算符重载2. 赋值运算符重载前…...
001 vue
https://cn.vuejs.org/ 文章目录 v-bindv-modelv-on修饰符条件渲染/控制:v-if v-show列表渲染 M:即Model,模型,包括数据和一些基本操作 V:即View,视图,页面渲染结果 VM:即View-Mode…...
web forms可视化开发显示的网页是用ExpressionWebEditorFrame控件,是IE内核还是简单的HTML解析?如何让他加载CSS和JS?
web forms可视化开发显示的网页是用ExpressionWebEditorFrame控件,是IE内核还是简单的HTML解析?如何让他加载CSS和JS? 1. ExpressionWebEditorFrame 控件的内核及解析机制 在 Visual Studio 中用于 Web Forms 可视化开发的 ExpressionWebEditorFrame 控件主要基于 Internet…...
$R^n$超平面约束下的向量列
原向量: x → \overset{\rightarrow}{x} x→ 与 x → \overset{\rightarrow}{x} x→法向相同的法向量(与 x → \overset{\rightarrow}{x} x→同向) ( x → ⋅ n → ∣ n → ∣ 2 ) n → (\frac{\overset{\rightarrow}x\cdot\overset{\righta…...
英伟达新一代GPU架构(50系列显卡)PyTorch兼容性解决方案
随着NVIDIA不断推出基于新架构的GPU产品,机器学习框架需要相应地更新以支持这些硬件。本文记录了在RTX 5070 Ti上运行PyTorch时遇到的CUDA兼容性问题,并详细分析了问题根源及其解决方案,以期为遇到类似情况的开发者提供参考。 在Anaconda虚…...
16.2Linux自带的LED灯驱动实验(详细编写)_csdn
这个实验不用自己编写代码。 1、在linux源代码中,打开 stm32mp15-pinctrl.dtsi 文件并进行修改: make uImage LOADADDR0XC2000040 -j8 //编译内核然后: 2、修改设备节点,打开 stm32mp157d-atk.dts: 其中࿱…...
Java 大视界 -- Java 大数据在智慧交通停车场智能管理与车位预测中的应用实践(174)
💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…...
HashMap 底层原理详解
1. 核心数据结构 JDK 1.7 及之前:数组 链表 JDK 1.8 及之后:数组 链表/红黑树(链表长度 ≥8 时转红黑树,≤6 时退化为链表) // JDK 1.8 的 Node 定义(链表节点) static class Node<K,V&g…...
重生之我是去噪高手——diffusion model
diffusion model是如何运作的? 想象一下,你有一张清晰的图片。扩散模型的核心思想分为两个过程: 前向过程(Forward Process / Diffusion Process):逐步加噪反向过程(Reverse Process / Denois…...
FfreeRTOS有阻塞作用的API
在 FreeRTOS 中,阻塞 API 是指那些会导致调用任务进入阻塞状态(Blocked State)的函数,即任务会暂时让出 CPU,直到某个条件满足(如超时、信号量可用、队列数据到达等)。以下是常见的阻塞 API 分类及示例: 1. 任务延迟(延时) vTaskDelay() 使任务阻塞指定的时间(以系统…...
app逆向专题二:app逆向流程
app逆向专题二:app逆向流程 一、app逆向说明二、拿到APP应用的apk三、使用工具进行查壳四、有壳需要先进行脱壳,拿到dex文件进行反编译五、使用Jadx-Gui或其他工具进行反编译,分析源码;六、根据app的抓包情况拿到加密的关键词参数…...
VMware 安装 Ubuntu 全流程实战指南:从零搭建到深度优化
在软件开发、系统测试以及技术学习等诸多场景中,使用虚拟机安装操作系统是一种灵活且高效的方式。Ubuntu 作为一款优秀的开源操作系统,在 VMware 虚拟机上的安装与优化备受关注。接下来,将为大家带来 VMware 安装 Ubuntu 的全流程实战指南&am…...
论文阅读笔记——RDT-1B: A DIFFUSION FOUNDATION MODEL FOR BIMANUAL MANIPULATION
RDT-1B 论文 模型表达与泛化能力:由于双臂操作中动作空间维度是单臂空间的两倍,传统方法难以建模其多模态分布。 数据:双臂数据少且不同机器人的物理结构和动作空间差异(如关节数、运动范围)导致数据分布不一致&#x…...
如何一天背300到500个单词
买一本有结构分析或词源注释的目标词汇书。 买一盒口香糖。 准备一摞空白的A4纸。 找一间用于冥想的黑屋子(眼晴闭上就可以了)。 将要背诵的单词进行分组: 5个一小组10个一中组50个一大组100个一个基本包或单元。给自己一个约定,比如背完一中组或一大组单词,嚼一粒口香糖…...
vs环境中编译osg以及osgQt
1、下载 OpenSceneGraph 获取源代码 您可以通过以下方式获取 OSG 源代码: 官网下载:https://github.com/openscenegraph/OpenSceneGraph/releases 使用 git 克隆: git clone https://github.com/openscenegraph/OpenSceneGraph.git 2、下载必要的第三方依赖库 依赖库 ht…...
C++ - 头文件基础(常用标准库头文件、自定义头文件、头文件引入方式、防止头文件重复包含机制)
一、头文件 在 C 中,头文件(.h)用于函数声明、类定义、宏定义等等 在 Visual Studio 中,头文件通常放在头文件目录中,头文件实现通常放在源文件目录中 二、常用标准库头文件 1、输入输出 <iostream> 标准输入…...
12款字重国外法国风格复古报纸日历设计衬线英文字体安装包 Claire Font Family
Claire 是一个带有坚固衬线的字体系列。该系列中的几种粗细字体非常适合设置大量连续文本;另一方面,极轻和极重的字体在显示应用中配合使用效果很好。Clair 中的字体具有垂直轴,其设计让人联想到当代报纸字体以及 Century 模型中的十九世纪晚…...
Java 类型转换和泛型原理(JVM 层面)
一、类型转换 概念解释: 编译类型:在编译时确定,保存在虚拟机栈的栈帧中的局部变量表中; 运行类型:在运行时确定,由保存在局部变量表中变量指向的堆中对象实例的类型决定(存储在对象头中&…...
ffmpeg基础知识入门
文章目录 📦 1. **容器(Container)**✅ 定义:✅ 举例:✅ 功能: 📶 2. **媒体流(Stream)**✅ 定义:✅ 举例:✅ 流和容器关系: …...
k8s 1.23升级1.24
0、简介 这里只用3台服务器来做一个简单的集群,当前版本是1.23.17目标升级到1.24.17 地址主机名192.168.160.40kuber-master-1192.168.160.41kuber-master-2192.168.160.42kuber-node-1 我这里设置的master2可调度pod,将master2的污点去掉 kubectl de…...
MIPI与DVP接口摄像头:深度解析与应用指南
1、MIPI 1.1 MIPI简介 MIPI是什么?MIPI:mobile industry processor interface移动行业处理器接口。它是一个由Intel、Motorola、Nokia、NXP、Samsung、ST(意法半导体)和TI(德州仪器)等公司发起的开放标准…...
liunx输入法
1安装fcitx5 sudo apt update sudo apt install fcitx fcitx-pinyin 2配置为默认输入法 设置-》系统-》区域和语言 点击系统弹出语言和支持选择键盘输入法系统 3设置设置 fcitx-configtool 如果没显示需要重启电脑 4配置fcitx 把搜狗输入法放到第一位(点击下面…...
马吕斯定律(Malus‘s Law)
马吕斯定律(Maluss Law)详解 马吕斯定律是偏振光学中的基本定律,由法国物理学家**tienne-Louis Malus**于1809年发现,描述了**线偏振光**通过检偏器后的光强变化规律。 2. 实验验证 3. 数学推导 4. 关键应用 5. 特殊情况讨论 …...
大厂算法面试 7 天冲刺:第6天-树与图深度剖析——高频算法面试题 Java 实战
🧠 第6天:树与图深度剖析——高频算法面试题 & Java 实战 📚 一、核心知识概览 Overview 1. 树(Tree) 树是一种非线性数据结构,常见于面试中的二叉树(Binary Tree)、二叉搜索树…...
C语言编译和链接错题
一、错题重现 1.用在switch语句中的关键字不包含哪个?( ) A.continue B.break C.default D.case 2.下面代码的结果是:( ) A.3 B.4 C.随机值 D.5 3.下面那个不是转义字符? A.\n B.\060 C.\q D.\b 二、错因分析及思考 1.题目看…...