当前位置: 首页 > news >正文

RabbitMQ解决消息积压的方法

目录

减少发送mq的消息体内容

增加消费者数量

批量消费消息

临时队列转移

监控和预警机制

分阶段实施

最后还有一个方法就是开启队列的懒加载


这篇文章总结一下自己知道的解决消息积压得方法。

减少发送mq的消息体内容

像我们没有必要知道一个的中间状态,只需知道一个最终状态就可以了。
发送的消息体只用包含:id和状态等关键信息,不用发送一个完整的对象内容。
消费者收到消息之后,通过id调用原服务再将完整的消息对象内容查询出来即可,最后再进行消费处理。

增加消费者数量

采用动态增加消费者的数量

@Configuration
public class RabbitMQConfig {@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置并发消费者数量factory.setConcurrentConsumers(5);        // 初始消费者数量factory.setMaxConcurrentConsumers(20);    // 最大消费者数量// 动态调整消费者数量factory.setConsumerTagStrategy(queue -> "consumer-" + UUID.randomUUID());return factory;}
}
@Service
public class ConsumerManagerService {@Autowiredprivate RabbitListenerEndpointRegistry registry;public void adjustConsumerCount(String queueName, int count) {MessageListenerContainer container = registry.getListenerContainer(queueName);if (container instanceof SimpleMessageListenerContainer) {SimpleMessageListenerContainer simpleContainer = (SimpleMessageListenerContainer) container;simpleContainer.setConcurrentConsumers(count);}}
}

批量消费消息

@Service
public class BatchMessageConsumer {@RabbitListener(queues = "myQueue", containerFactory = "batchFactory")public void processBatch(List<Message> messages, Channel channel) {try {// 批量处理消息List<MessageDTO> dtos = messages.stream().map(this::convertToDTO).collect(Collectors.toList());// 批量保存到数据库batchSaveToDatabase(dtos);// 获取最后一条消息的deliveryTaglong lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();// 批量确认channel.basicAck(lastDeliveryTag, true);} catch (Exception e) {// 批量拒绝handleBatchError(messages, channel);}}
}// 配置批量消费
@Configuration
public class BatchConsumerConfig {@Beanpublic SimpleRabbitListenerContainerFactory batchFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 启用批量消费factory.setBatchListener(true);// 批量大小factory.setBatchSize(100);// 批量超时时间factory.setReceiveTimeout(1000L);return factory;}
}

临时队列转移

@Service
public class MessageTransferService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {while (true) {// 从源队列批量获取消息List<Message> messages = new ArrayList<>();for (int i = 0; i < batchSize; i++) {Message message = rabbitTemplate.receive(sourceQueue);if (message == null) break;messages.add(message);}if (messages.isEmpty()) break;// 转移到临时队列messages.forEach(msg -> rabbitTemplate.send(tempQueue, msg));}}
}// 临时队列的消费者
@Component
public class TempQueueConsumer {@RabbitListener(queues = "#{tempQueue.name}")public void processMessage(Message message) {// 使用更高效的处理方式fastProcessMessage(message);}@Beanpublic Queue tempQueue() {return new Queue("temp-queue-" + UUID.randomUUID(), false, false, true);}
}

监控和预警机制

@Service
@Slf4j
public class QueueMonitorService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void monitorQueueSize() {String queueName = "myQueue";// 获取队列信息Properties properties = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(queueName));// 获取消息数量int messageCount = properties.getMessageCount();// 检查消息堆积if (messageCount > threshold) {// 发送告警sendAlert(queueName, messageCount);// 动态调整消费者adjustConsumers(queueName, messageCount);}}private void adjustConsumers(String queueName, int messageCount) {// 根据消息数量动态调整消费者数量int newConsumerCount = calculateConsumerCount(messageCount);consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);}
}

分阶段实施

@Service
public class MessageHandlingStrategy {public void handleMessageBacklog() {// 1. 首先增加消费者数量adjustConsumerCount();// 2. 如果仍然堆积,启用批量处理if (isStillBacklogged()) {enableBatchProcessing();}// 3. 如果问题持续,使用临时队列if (isEmergency()) {transferToTemporaryQueue();}}
}

最后还有一个方法就是开启队列的懒加载

相关文章:

RabbitMQ解决消息积压的方法

目录 减少发送mq的消息体内容 增加消费者数量 批量消费消息 临时队列转移 监控和预警机制 分阶段实施 最后还有一个方法就是开启队列的懒加载 这篇文章总结一下自己知道的解决消息积压得方法。 减少发送mq的消息体内容 像我们没有必要知道一个的中间状态&#xff0c;只需…...

机器学习是?

机器学习&#xff08;Machine Learning&#xff09;是一门多领域交叉学科&#xff0c;涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科; 是人工智能&#xff08;AI&#xff09;的一个分支&#xff0c;也是AI的核心领域&#xff0c;它专注于开发算法和模型&#…...

jupyter出现“.ipynb appears to have died. It will restart automatically.”解决方法

原因 解决方法&#xff1a;更新jupyter的版本 1.打开anaconda prompt 2、更新jupyter版本 在anaconda prompt输入以下指令 conda update jupyter如图&#xff1a;...

计算机毕业设计PyHive+Hadoop深圳共享单车预测系统 共享单车数据分析可视化大屏 共享单车爬虫 共享单车数据仓库 机器学习 深度学习

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

c++开源协程库libgo介绍及使用,srs协程,boost协程 Boost::fiber

https://www.cnblogs.com/qwsdcv/p/9115364.html Boost - 从Coroutine2 到Fiber - 开学五年级了 - 博客园 协程就是由程序员控制跑在线程里的“微线程”。它可以由程序员调度&#xff0c;切换协程时代价小(切换根据实现不同&#xff0c;消耗的CPU周期从几十到几百不等)&#x…...

微服务实现高并发 秒杀系统,前后端实现

一、前端实现 前端项目初始化 首先&#xff0c;我们需要创建一个新的 Vue 3 项目。你可以使用 Vue CLI 来快速搭建项目。 安装 Vue CLI&#xff08;如果尚未安装&#xff09; bash npm install -g vue/cli 创建 Vue 项目 bash vue create seckill-frontend cd seckill-f…...

Eureka缓存机制

一、Eureka的CAP特性 Eureka是一个AP系统&#xff0c;它优先保证可用性&#xff08;A&#xff09;和分区容错性&#xff08;P&#xff09;&#xff0c;而不保证强一致性&#xff08;C&#xff09;。这种设计使得Eureka在分布式系统中能够应对各种故障和分区情况&#xff0c;保…...

PHP语言的学习路线

PHP语言的学习路线 PHP&#xff08;Hypertext Preprocessor&#xff09;是一种广泛使用的开源服务器端脚本语言&#xff0c;尤其适用于Web开发。由于其易学易用、功能强大&#xff0c;PHP成为了许多动态网站和Web应用程序开发的首选语言。随着Web3.0和云计算的兴起&#xff0c…...

python学opencv|读取图像(二十八)使用cv2.warpAffine()函数平移图像

【1】引言 前序已经对图像操作进行了广泛的学习&#xff0c;包括读取、放大缩小&#xff0c;改变BGR通道值等&#xff0c;相关链接包括且不限于&#xff1a; python学opencv|读取图像-CSDN博客 python学opencv|读取图像&#xff08;三&#xff09;放大和缩小图像_python(1)使…...

[Linux]Mysql9.0.1服务端脱机安装配置教程(redhat)

前言 本教程适用于在yum源不可用的LInux主机上安装Mysql的场景。 以redhat系主机做操作示例&#xff0c;debian系主机可参照步骤&#xff0c;将对应的rpm -ivh命令换成dpkg -i。 1. 官网下载安装包 https://dev.mysql.com/downloads/mysql/ 1.1 版本分类 MySQL Enterprise…...

个人 ALL IN ONE 方案搭建方案分享(从硬件到软件)及内网穿透方案

这里只做大概方案分享&#xff0c;每个虚拟机的部署细节滤过。 个人 ALL IN ONE 方案搭建方案分享 本指南将详细介绍如何基于现有硬件搭建一体化家庭/个人服务器解决方案&#xff0c;涵盖从软硬件配置、系统安装到功能实现以及性能优化的全过程。实现集 软路由、旁路由、NAS 网…...

TrustRAG:增强RAG系统鲁棒性与可信度的创新框架

在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;LLMs&#xff09;凭借其强大的语言处理能力在诸多领域大放异彩。检索增强生成&#xff08;RAG&#xff09;系统&#xff08;面向企业RAG&#xff08;Retrieval Augmented Generation&#xff09;系统的多维检索框架…...

使用证件照制作软件的常见问题及解决方案

在数字化时代&#xff0c;证件照的制作变得越来越简单。借助各种证件照制作软件&#xff0c;我们可以轻松在家中制作出符合要求的证件照。然而&#xff0c;用户在使用这些软件时&#xff0c;可能会遇到一些常见问题。为了帮助您顺利制作出满意的证件照&#xff0c;我们整理了一…...

通过gradle发布aar或jar携带sources-jar到maven nexus

找了很久&#xff0c;没有找到满意的。终于找到一个好的办法。 gradle7.x适用。比以前的写法简洁。 发布传统的jar工程 比如okhttp&#xff0c;fastjson等项目&#xff0c;纯java工程。 直接创建新文件publish.gradle: apply plugin: maven-publishProperties properties …...

SAP推出云端ERP解决方案,加速零售行业数字化转型

2025年1月9日&#xff0c;SAP发布了一款专为零售行业设计的云端ERP行业解决方案——S/4HANA Cloud Public Edition&#xff0c;进一步推动企业向云端迁移。这款解决方案旨在集中运营数据&#xff0c;整合财务、采购和商品管理流程&#xff0c;以帮助零售企业优化运营效率。 核…...

RK3568 Android 13 内置搜狗输入法小计

问&#xff1a;为什么写&#xff1f; 答&#xff1a;网上搜出来的都试过了&#xff0c;不行&#xff01;下面直接上代码和注意事项&#xff01; 首先到这个目录&#xff08;/RK3568/Rockchip_Android13_SDK_Release/device/rockchip/rk356x/tl3568_evm/preinstall&#xff09…...

微服务-Nacos(注册中心)

Nacos Nacos可以看作注册中心配置中心&#xff0c;比Eureka更加强大。 注册中心 在父工程中引入SpringCloudAlibaba的版本依赖 <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId&g…...

【数据结构:前缀树Trie】

目录 前言前缀树介绍和应用一、前缀树的定义前缀树的问题和思考前缀树的映射思想前缀树三大性质 二.前缀树节点结构三. 前缀树接口介绍和实现四个接口API1. insert(String word)2. search(String word)3. startsWith(String pre)4. delete(String word) API实现1. 查询操作sear…...

如何让QPS提升20倍

一、什么是QPS QPS&#xff0c;全称Queries Per Second&#xff0c;即每秒查询率&#xff0c;是用于衡量信息检索系统&#xff08;例如搜索引擎或数据库&#xff09;或请求-响应系统&#xff08;如Web服务器&#xff09;每秒能够处理的请求数或查询次数的一个性能指标。以下是…...

时间复杂度简介

定义 时间复杂度是用来衡量算法运行时间随着输入规模增长而增长的量级。简单来说&#xff0c;它描述了算法执行时间与数据规模之间的关系。我们通常用大O符号&#xff08; O O O&#xff09;来表示时间复杂度。例如&#xff0c;对于一个简单的加法运算&#xff0c;它的执行时间…...

记一次sealos部署k8s集群之delete了第一台master如何恢复

记一次sealos部署k8s集群之delete了第一台master如何恢复 一、背景描述 使用sealos部署了一套K8S集群 master信息:172.27.100.1、172.27.100.2、172.27.100.3 node信息:172.27.100.4、172.27.100.5 sealos安装在172.27.100.1节点,根目录下/root/.sealos/文件还在! [root…...

【json】

JSON JSON是一种轻量级的,按照指定的格式去组织和封装数据的数据交互格式。 本质上是一个带有特定格式的字符串(py打印json时认定为str类型) 在各个编程语言中流通的数据格式&#xff0c;负责不同编程语言中的数据传递和交互,类似于计算机普通话 python与json关系及相互转换…...

TypeScript语言的并发编程

TypeScript语言的并发编程 引言 随着现代应用程序的复杂性不断增加&#xff0c;性能和用户体验的重要性显得尤为突出。在这种背景下&#xff0c;并发编程应运而生&#xff0c;成为提升应用程序效率的重要手段。在JavaScript及其超集TypeScript中&#xff0c;尽管语言本身是单…...

左值引用(Lvalue Reference)和右值引用(Rvalue Reference)详解

左值引用&#xff08;Lvalue Reference&#xff09;和右值引用&#xff08;Rvalue Reference&#xff09;详解 文章目录 左值引用&#xff08;Lvalue Reference&#xff09;和右值引用&#xff08;Rvalue Reference&#xff09;详解1. 什么是左值和右值&#xff1f;左值&#x…...

音视频入门基础:RTP专题(1)——RTP官方文档下载

一、引言 实时传输协议&#xff08;Real-time Transport Protocol&#xff0c;简写RTP&#xff09;是一个网络传输协议&#xff0c;由IETF的多媒体传输工作小组1996年在《RFC 1889》中公布的。 RTP作为因特网标准在《RFC 3550》有详细说明。而《RFC 3551》详细描述了使用最小…...

【Flutter】使用ScrollController配合EasyRefresh实现列表预加载:在还未滑动到底部时加载下一页数据

需求/背景 在我们的业务场景中&#xff0c;列表的加载使用easy_refresh组件&#xff1a; https://pub.dev/packages/easy_refresh 大概效果是往上滑动到一定的offset会触发一个上滑加载&#xff0c;可以触发一些网络请求拉取列表后面的数据来展示。 这种模式一般在一页翻完…...

js实现md5加密

要在JavaScript中实现MD5加密并截取特定位置的字符&#xff0c;你可以使用像crypto-js这样的库。首先&#xff0c;你需要确保你的项目中包含了crypto-js库。如果你是在浏览器环境中&#xff0c;可以通过CDN引入&#xff1b;如果是在Node.js环境中&#xff0c;可以通过npm安装。…...

[java基础-集合篇]LinkedList源码粗析

LinkedList 的数据结构 实现List、Deque 接口&#xff0c;基于 双向链表实现的列表。与基于数组的 ArrayList 不同&#xff0c;基于链表的LinkedList 允许在列表的任何位置快速地插入和删除元素。 Java中LinkedList实现了Deque&#xff0c;它提供了 add, offer, remove, poll, …...

【Rust自学】11.1. 编写和运行测试

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 11.1.1. 什么是测试 在Rust里一个测试就是一个函数&#xff0c;它被用于验证非测试代码的功能是否和预期一致。 在一个测试的函数体里通…...

移动端屏幕分辨率rem,less

谷歌模拟器&#xff1a;能直接看到移动端效果 屏幕分辨率 右键电脑桌面 &#xff0c;点击显示设置 PC端是逻辑分辨率&#xff0c;移动端代码也是参考逻辑分辨率 网页端宽度和逻辑分辨率尺寸相同 手机屏幕尺寸不同&#xff0c;网页宽度均为 100% 所以就需要添加视口标签&#x…...

rk3568 , buildroot , qt ,使用sqlite, 动态库, 静态库

问题说明&#xff1a; 客户反馈 &#xff0c;buildroot 系统 &#xff0c;使用qt 使用sqlite &#xff0c;有报错&#xff0c;无法使用sqlite. 测试情况说明&#xff1a; 我自己测试&#xff0c;发现&#xff0c; buildroot 自己默认就是 使能了 sqlite 的。 是否解决说明&…...

web-app uniapp监测屏幕大小的变化对数组一行展示数据作相应处理

web-app uniapp监测屏幕大小的变化对数组一行展示数据作相应处理 1.uni.getSystemInfoSync().screenWidth; 获取屏幕宽度 2.uni.onWindowResize&#xff08;&#xff09; 实时监测屏幕宽度变化 3.根据宽度的大小拿到每行要展示的数量itemsPerRow 4.为了确保样式能够根据 items…...

Airflow:TimeSensor感知时间条件

在数据管道工作流中&#xff0c;任务可能需要在特定的时间执行&#xff0c;或者在继续之前等待一定的时间。为了满足这些需求&#xff0c;Apache Airflow提供了TimeSensor&#xff0c;这是一种内置Sensor&#xff0c;可以监控当前时间&#xff0c;并在达到指定时间时触发后续任…...

使用Python和Neo4j驱动程序来实现小规模数据的CSV导入

要将CSV数据导入到Neo4j数据库中&#xff0c;你可以使用Neo4j提供的工具&#xff0c;比如neo4j-admin import命令&#xff08;适用于大规模数据导入&#xff09;&#xff0c;或者使用Python的Neo4j驱动程序通过Cypher查询逐行插入数据&#xff08;适用于小规模数据导入&#xf…...

网络安全测评技术与标准

网络安全测评概况 网络安全测评是网络信息系统和IT技术产品的安全质量保障。本节主要阐述网络安全测评的概念&#xff0c;给出网络安全测评的发展状况。 18.1.1 网络安全测评概念 网络安全测评是指参照一定的标准规范要求&#xff0c;通过一系列的技术和管理方法&#xff0c;获…...

某漫画网站JS逆向反混淆流程分析

文章目录 1. 写在前面1. 接口分析2. 反混淆分析 【&#x1f3e0;作者主页】&#xff1a;吴秋霖 【&#x1f4bc;作者介绍】&#xff1a;擅长爬虫与JS加密逆向分析&#xff01;Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致力于Pyth…...

如何获取文件的MIME类型

文章目录 1. 概念介绍2. 方法与类型2.1 使用方法2.2 常见类型3. 示例代码4. 内容总结我们在上一章回中介绍了"如何加载本地图片"相关的内容,本章回中将介绍如何获取文件类型.闲话休提,让我们一起Talk Flutter吧。 1. 概念介绍 我们在本章回中提到的文件类型是指MI…...

Three.js 基础概念:构建3D世界的核心要素

文章目录 前言一、场景&#xff08;Scene&#xff09;二、相机&#xff08;Camera&#xff09;三、渲染器&#xff08;Renderer&#xff09;四、物体&#xff08;Object&#xff09;五、材质&#xff08;Material&#xff09;六、几何体&#xff08;Geometry&#xff09;七、光…...

Linux web服务器

Linux 作为 Web 服务器操作系统 安装 Web 服务器软件&#xff08;以 Apache 为例&#xff09; 步骤一&#xff1a;更新系统软件包列表 在 CentOS 系统中&#xff0c;使用命令 yum update -y 这个命令会连接到 CentOS 的软件包仓库&#xff0c;检查所有已安装软件包是否有更…...

Linux 下信号的保存和处理

信号的几个状态 信号抵达: 当接收到的信号被处理时, 此时就成为信号的抵达信号的未决: 从信号的产生到信号抵达这个时间段之间, 称为信号未决信号阻塞: 当进程设置了某个信号为阻塞后, 这个进程就不会在接收到这个信号信号忽略: 将信号设置为忽略后, 接收到这个信号, 对这个信…...

宝塔安装教程,bt怎么安装 linux

Centos安装脚本 yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh 37a09b35 Ubuntu/Deepin安装脚本 wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo b…...

java通过ocr实现识别pdf中的文字

需求&#xff1a;识别pdf文件中的中文 根据github项目mymonstercat 改造,先将pdf文件转为png文件存于临时文件夹&#xff0c;然后通过RapidOcr转为文字,最后删除临时文件夹 1、引入依赖 <dependency><groupId>org.apache.pdfbox</groupId><artifactId&g…...

基于SpringBoot的养老院管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…...

阿里云发现后门webshell,怎么处理,怎么解决?

当收到如下阿里云通知邮件时&#xff0c;大部分管理员都会心里一惊吧&#xff01;出现Webshell&#xff0c;大概是网站被入侵了。 尊敬的 xxxaliyun.com&#xff1a; 云盾云安全中心检测到您的服务器&#xff1a;47.108.x.xx&#xff08;xx机&#xff09;出现了紧急安全事件…...

韩顺平老师Linux学习笔记【持续更新...】

1、课程内容 1.1、课程大纲 1.2、Linux使用在哪些地方 Linux运维工程师Linux嵌入式工程师Linux下开发项目&#xff1a;JavaEE、大数据、Python、PHP、C/C、Go 1.3、Linux的应用领域 个人桌面领域服务器领域&#xff08;最强领域&#xff09;嵌入式领域 2、Linux入门 2.1、…...

Cognitive architecture 又是个什么东东?

自Langchain&#xff1a; https://blog.langchain.dev/what-is-a-cognitive-architecture/ https://en.wikipedia.org/wiki/Cognitive_architecture 定义 A cognitive architecture refers to both a theory about the structure of the human mind and to a computational…...

【css】浏览器强制设置元素状态(hover|focus……)

直接上步骤&#xff1a; 打开浏览器控制台 → 找到样式选项 → 找到:hov选项 → 点击:hov选项&#xff0c;会展开【设置元素状态】。 只要选中就会展示出自己写在css里面的该种状态下的样式了。...

leetcode热题100——NO.160相交链表——JAVA

一、题目描述 题目&#xff1a;给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。题目数据 保证 整个链式结构中不存在环。 注意&#xff0c;函数返回结果后&#xff0c;链表必…...

基于Media+Unity的手部位姿三维位姿估计

使用mediapipe Unity 手部位姿三维位姿估计 参考文章 基于Mediapipe的姿势识别并同步到Unity人体模型中 MediapipeUnity3d实现虚拟手_unity mediapipe-CSDN博客 需求 我的需求就是快速、准确的跟踪手部位姿并实现一个三维显示。 主要思路 搭建mdeiapipe系统&#xff0c…...

cJson——序列化格式json和protobuf对比

cJson——序列化格式json和protobuf对比 1. 更小的消息体积2. 更快的序列化与反序列化速度3. 类型安全4. 向后和向前兼容性5. 更低的带宽消耗6. 高效的编码方式7. 易于跨语言支持8. 支持复杂的数据结构9. 更好的支持大型数据交换总结 Protocol Buffers (Protobuf) 和 JSON 都是…...