当前位置: 首页 > news >正文

rocketmq优先级控制 + 并发度控制

背景

最近在做大模型的项目,算法部门提供的文档解析接口, 并发度为1, 业务这边需要在ai问答和上传文档时进行解析和向量化,文档解析只能单线程跑,问答的文档解析需要高优先级处理。

采用 rocketmq 做文档上传和解析的解耦(项目背景在,无法用其它 mq 替换)。

由于rocketmq不支持优先级队列,需要自己实现优先级队列的效果。

基本思路

创建两个 topic 和两个 group, 分别对应高优先级任务(HighPriorityTopic)和低优先级任务(LowPriorityTopic)。
采用 pull 模式,手动拉取消息,如果 HighPriorityTopic 拉取为空,再去 LowPriorityTopic 拉取消息。

结果

在两个消息队列都为空的时候,上传文档,解析任务会延迟大概5min才消费。
原因是 dev环境文档解析大概需要2min, 而为了避免消息重复消费,我把 mq 的 invisibleDuration 改到了 5min。
当消息错过这个时间窗口的时候,只能等到下个时间窗口才能消费。

解决办法

高优先级队列 push 模式, 低优先级队列 pull 模式。采用 AtomicInteger 做状态判断。
测试结果mq 消息消费几乎没延迟。

代码

private void consumeMessage(ClientConfiguration clientConfiguration) throws ClientException, InterruptedException {// 创建并初始化消费者FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);// 高优先级消费 push 模式clientServiceProvider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(docParseHighPriorityGroup).setSubscriptionExpressions(Map.of(docParseHighPriorityTopic, filterExpression)).setMessageListener(messageView -> {highPriorityTaskCount.incrementAndGet();try {log.info("[文档解析] 消费高优先级解析任务:topic = {}, messageId = {}", messageView.getTopic(), messageView.getMessageId());this.dealMessage(messageView);return ConsumeResult.SUCCESS;} finally {highPriorityTaskCount.decrementAndGet();}}).build();// 低优先级消费 pull 模式SimpleConsumer lowPriorityConsumer = clientServiceProvider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(docParseGroup).setAwaitDuration(Duration.ofSeconds(10)).setSubscriptionExpressions(Map.of(docParseTaskTopic, filterExpression)).build();int maxMessageNum = 1;Duration invisibleDuration = Duration.ofMinutes(5);while (true) {if (highPriorityTaskCount.get() == 0) { // 高优先级没消息了才开始消费List<MessageView> receive = lowPriorityConsumer.receive(maxMessageNum, invisibleDuration);log.info("[文档解析] 消费低优先级解析任务,收到消息数量:{}", receive.size());this.handleMessages(lowPriorityConsumer, receive);} else {Thread.sleep(1_000);}}}

相关文章:

rocketmq优先级控制 + 并发度控制

背景 最近在做大模型的项目&#xff0c;算法部门提供的文档解析接口&#xff0c; 并发度为1&#xff0c; 业务这边需要在ai问答和上传文档时进行解析和向量化&#xff0c;文档解析只能单线程跑&#xff0c;问答的文档解析需要高优先级处理。 采用 rocketmq 做文档上传和解析的…...

从0开始学linux韦东山教程第四章问题小结(2)

本人从0开始学习linux&#xff0c;使用的是韦东山的教程&#xff0c;在跟着课程学习的情况下的所遇到的问题的总结,理论虽枯燥但是是基础。说实在的越看视频越感觉他讲的有点乱后续将以他的新版PDF手册为中心&#xff0c;视频作为辅助理解的工具。参考手册为嵌入式Linux应用开发…...

洛谷P1226 【模板】快速幂

题目来源 P1226 【模板】快速幂 - 洛谷 题目描述 给你三个整数 a,b,p&#xff0c;求 abmodp。 输入格式 输入只有一行三个整数&#xff0c;分别代表 a,b,p。 输出格式 输出一行一个字符串 a^b mod ps&#xff0c;其中 a,b,p 分别为题目给定的值&#xff0c; s 为运算结果…...

自动点焊机:在多类电池生产中筑牢质量与效率根基

在电池制造产业飞速发展的当下&#xff0c;焊接作为电池组装的关键环节&#xff0c;其质量与效率直接影响着电池的性能与安全性。自动点焊机凭借其高效、精准、稳定的特性&#xff0c;在电动工具电池、扭扭车电池、储能电池包、滑板车电池以及电动车电池等多个电池制造领域大放…...

信息系统项目管理师考前练习1

以下是结合《信息系统项目管理师教程》(第5版)核心考点和当前行业热点的20道选择题押题,涵盖重点知识和新兴趋势,供考前冲刺练习: 项目生命周期模型选择 在敏捷开发项目中,客户需求频繁变更,且团队希望快速交付最小可行产品(MVP),最适合采用的生命周期模型是: A. …...

C++ for QWidget:正则表达式和QRegExp

正则表达式 正则表达式&#xff0c;又称规则表达式&#xff0c;&#xff08;Regular Expression&#xff0c;在代码中常简写为regex、regexp或RE&#xff09;&#xff0c;是计算机科学的一个概念。以下是对正则表达式的详细介绍&#xff1a; 一、定义与作用 正则表达式是一种文…...

day019-特殊符号、正则表达式与三剑客

文章目录 1. 磁盘空间不足-排查流程2. 李导推荐书籍2.1 大话存储2.2 性能之巅 3. 特殊符号3.1 引号系列&#xff08;面试题&#xff09;3.2 重定向符号3.2.1 cat与重定向3.2.2 tr命令&#xff1a;替换字符3.2.3 xargs&#xff1a;参数转换3.2.4 标准全量追加重定向 4. 正则表达…...

学习黑客了解5分钟了解中间人攻击(MITM)

5分钟了解中间人攻击&#xff08;MITM&#xff09;&#x1f575;️‍♂️ 什么是中间人攻击&#xff08;Man-in-the-Middle, MITM&#xff09;&#xff1f; 中间人攻击是一种网络攻击方式&#xff0c;攻击者悄无声息地“夹在”通信两端之间&#xff0c;偷偷读取、篡改、伪造或…...

亚马逊第四个机器人中心将如何降低30%配送成本?

近年来&#xff0c;亚马逊越来越依赖自动化技术来提升仓储效率和配送速度。2024年&#xff0c;亚马逊宣布其全球第四个机器人中心在美国正式投入运营&#xff0c;这一中心将成为改变供应链策略的新变量。据亚马逊官方消息&#xff0c;这一机器人中心有望帮助公司进一步削减运营…...

「AR智慧应急」新时代:当AR眼镜遇上智能监控,打造立体化应急指挥系统

引言&#xff1a;应急管理的未来已来 数字化浪潮正重塑应急管理领域。传统监控系统依赖固定屏幕、被动告警的短板&#xff0c;在复杂突发事件中暴露无遗。而AR眼镜视频监控管理平台应急应急管理平台的三维融合&#xff0c;正开启"上帝视角"指挥时代——通过虚实叠加…...

docker 启动一个python环境的项目

安装镜像 docker pull python:3.8-slim8902端口 启动容器 tail -f /dev/null 持续监听空文件&#xff0c;保持容器活跃 docker run -it \-p 8902:8902 \--name api_mock2 \-v /home/py/test:/app \-w /app \python:3.8-slim \tail -f /dev/null进入容器 docker exec -it api…...

Docker run命令-p参数详解

端口映射基础语法 docker run -p <宿主机端口>:<容器端口> 操作示例 docker run -d --restartalways --namespug -p 5000:80 registry.aliyuncs.com/openspug/spug参数解析 -d&#xff1a;后台运行容器--restartalways&#xff1a;设置容器自动重启--namespug&…...

vue3请求设置responseType: ‘blob‘,导致失败后获取不到返回信息

vue3请求设置responseType: ‘blob’,导致失败后获取不到返回信息 使用FileReader解决 dataCollect().downloadAll(data).then((res: any) > {if (res.type application/json) {const fileReader new FileReader();fileReader.readAsText(new Blob([res], { type: applica…...

在 Windows 系统部署对冲基金分析工具 ai-hedge-fund 的笔记

#工作记录 一、环境准备 在部署对冲基金分析工具ai-hedge-fund前&#xff0c;需提前安装好必备软件&#xff0c;为后续工作搭建好基础环境。 1. 安装 Anaconda Anaconda 集成了 Python 及众多科学计算库&#xff0c;是项目运行的重要基础。从Anaconda 官方网站下载适合 Win…...

基于python的机器学习(八)—— 评估算法(一)

目录 一、机器学习评估的基本概念 1.1 评估的定义与目标 1.2 常见评估指标 1.3 训练集、验证集与测试集的划分 二、分离数据集 2.1 分离训练数据集和评估数据集 2.2 k折交叉验证分离 2.3 弃一交叉验证分离 2.4 重复随机评估和训练数据集分离 三、交叉验证技术 3.…...

广东省省考备考(第十六天5.21)—言语:语句排序题(听课后强化)

错题 解析 对比选项&#xff0c;确定首句。①句介绍目前人类可以利用一些技术手段进入元宇宙&#xff0c;凭借网络重新定义自己&#xff0c;体验一种全新的生活&#xff0c;②句介绍对于多数人来说&#xff0c;首先要弄清楚什么是元宇宙&#xff0c;③句介绍元宇宙是指超越现实…...

什么是实时流数据?核心概念与应用场景解析

在当今数字经济时代&#xff0c;实时流数据正成为企业核心竞争力。金融机构需要实时风控系统在欺诈交易发生的瞬间进行拦截&#xff1b;电商平台需要根据用户实时行为提供个性化推荐&#xff1b;工业物联网需要监控设备状态预防故障。这些场景都要求系统能够“即时感知、即时分…...

计算机视觉与深度学习 | Python实现CEEMDAN-ABC-VMD-DBO-CNN-LSTM时间序列预测(完整源码和数据)

以下是一个结合CEEMDAN、ABC优化VMD、DBO优化CNN-LSTM的完整时间序列预测实现方案。该方案包含完整的数据生成、算法实现和模型构建代码。 完整实现代码 import numpy as np import pandas as pd from PyEMD import CEEMDAN from vmdpy import VMD from sklearn.preprocessing…...

每日Prompt:实物与手绘涂鸦创意广告

提示词 一则简约且富有创意的广告&#xff0c;设置在纯白背景上。 一个真实的 [真实物体] 与手绘黑色墨水涂鸦相结合&#xff0c;线条松散而俏皮。涂鸦描绘了&#xff1a;[涂鸦概念及交互&#xff1a;以巧妙、富有想象力的方式与物体互动]。在顶部或中部加入粗体黑色 [广告文案…...

期刊采编系统安装升级错误

我们以ojs系统为例&#xff1a; PHP Fatal error: Uncaught Error: Call to a member function getId() on null in /esci/data/html/classes/install/Upgrade.inc.php:1019 Stacktrace: #0 /esci/data/html/lib/pkp/classes/install/Installer.inc.php(415): Upgrade->con…...

【linux命令】git命令简单使用

git命令简单使用 1. 将代码下载到到本地2. 查看分支是否正确3. 将工作目录中的变更添加到暂存区&#xff0c;为下一次提交做准备4. 提交更改&#xff0c;添加提交信息5. 将本地的提交推送到远程仓库6.从远端仓库拉取分支代码7.查看修改日志8. 解决冲突 1. 将代码下载到到本地 …...

使用Tkinter写一个发送kafka消息的工具

文章目录 背景工具界面展示功能代码讲解运行环境创建GUI程序搭建前端样式编写功能实现代码 背景 公司是做AR实景产品的&#xff0c;近几年无人机特别的火&#xff0c;一来公司比较关注低空经济这个新型领域&#xff0c;二来很多政企、事业单位都采购了无人机用于日常工作。那么…...

【VS2017】cpp文件字符编码异常导致编译报错

这是一个 wav 转 pcm 的简单demo&#xff0c;但VS2017编译报错 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <inttypes.h> #pragma pack(push, 1) struct TWavHead {int8_t riff[4]; /*!< (4)资源交换文件标志 RIFF */uint32_t file_si…...

Vue3 中 Route 与 Router 的区别

在 Vue Router 中&#xff0c;Route 和 Router 是两个相关但完全不同的概念&#xff1a; 1、Router (路由实例) 定义&#xff1a;Router 是路由器的实例&#xff0c;负责整个应用的路由管理 功能&#xff1a; 管理路由映射表&#xff08;路由配置&#xff09; 提供编程式导航…...

gcc还会有自己的头文件呢?

1. GCC自己的头文件目录 路径&#xff1a; .../lib/gcc/<target>/<version>/include 作用&#xff1a; 这里存放的是GCC编译器自身实现的一些头文件&#xff0c;比如 stdarg.h、float.h、limits.h、varargs.h 等。这些头文件是C/C标准规定必须有的&#xff0c;但…...

YOLO训练输入尺寸代表什么 --input_width 和 --input_height 参数

参数作用 硬件对齐要求 许多边缘计算芯片&#xff08;如 K230&#xff09;的 NPU 对输入尺寸有 内存对齐要求&#xff08;例如 32 的倍数&#xff09;。脚本会自动将你填写的输入尺寸向上对齐到最近的 32 倍数&#xff1a; input_width int(math.ceil(args.input_width / 32.0…...

缓存穿透、缓存击穿、缓存雪崩解决方案

在分布式系统中,缓存是提升性能的关键组件,但也可能面临 缓存穿透、缓存击穿、缓存雪崩 三大典型问题。以下是三者的核心概念、区别及解决方案: 一、缓存穿透(Cache Penetration) 概念 场景:客户端请求 不存在的数据(如恶意攻击的非法 Key),由于缓存和数...

前端面经-nginx/docker

1.如何查看 Linux 系统负载&#xff1f;如何判断负载是否过高&#xff1f; 使用 top、htop 或 uptime 查看系统负载。 负载值&#xff08;Load Average&#xff09;反映 CPU 繁忙程度&#xff0c;理想情况下应小于 CPU 核心数。例如&#xff0c;4 核 CPU 的负载持续超过 4 表示…...

权限控制相关实现

Spring Boot-Shiro-Vue&#xff1a; 这个项目可以满足基本的权限控制需求&#xff0c;前后端都有&#xff0c;开箱即用...

[论文精读]Ward: Provable RAG Dataset Inference via LLM Watermarks

Ward: Provable RAG Dataset Inference via LLM Watermarks [2410.03537] Ward: Provable RAG Dataset Inference via LLM Watermarks ICLR 2025 Rebuttal&#xff1a;Ward: 可证明的 RAG 数据集推理通过 LLM 水印 | OpenReview --- Ward: Provable RAG Dataset Inference v…...

第23天-Python Flet 开发指南

环境准备 pip install flet 示例1:基础计数器应用 import flet as ftdef main(page: ft.Page):page.title = "计数器"page.vertical_alignment = ft.MainAxisAlignment.CENTERtxt_number = ft.TextField(value="0", text_align=ft.TextAlign.RIGHT, wid…...

LangGraph(五)——自定义状态

目录 1. 向状态添加键2. 更新工具中的状态3. 构建状态图4. 提示聊天机器人5. 添加人工协助6. 手动更新状态参考 1. 向状态添加键 通过向状态添加name和birthday键来更新聊天机器人对实体生日的研究&#xff1a; from typing import Annotated from typing_extensions import T…...

fatload使用方式

‌Fatload是U-Boot中的一个命令&#xff0c;用于从FAT文件系统加载二进制文件到内存中‌。其基本用法如下&#xff1a; fatload <interface> <dev[:part]> <addr> <filename> <bytes>‌interface‌&#xff1a;所使用的接口&#xff0c;如MMC、…...

Pytorch基础操作

面试的时候&#xff0c;PhD看我简历上面写了”熟悉pytorch框架“&#xff0c;然后就猛猛提问了有关于tensor切片的问题…当然是没答上来&#xff0c;因此在这里整理一下pytorch的一些基础编程语法&#xff0c;常看常新 PyTorch基础操作全解 一、张量初始化 PyTorch的核心数据…...

Femap许可证安装与配置指南

在电磁仿真领域&#xff0c;Femap凭借其卓越的性能和广泛的应用场景&#xff0c;已成为许多工程师和科研人员的首选工具。为了确保您能够顺利安装和配置Femap许可证&#xff0c;本文将提供详细的安装和配置指南&#xff0c;帮助您快速完成设置&#xff0c;开启高效的仿真之旅。…...

家用和类似用途电器的安全 第1部分:通用要求 与2005版差异(7)

文未有本标准免费下载链接。 ——增加了“对峰值电压大于15kV的&#xff0c;其放电电能应不超过350mJ”的要求&#xff08;见8.1.4&#xff09; 1. GB/T4706.1-2024&#xff1a; 8.1.4 如果易触及部件为下述情况,则不认为其是带电的。 ——该部件由安全特低电压供电,且: 对…...

基于Browser Use + Playwright 实现AI Agent操作Web UI自动化

Browser Use是什么 Browser Use是一个开源项目官网&#xff1a;Browser Use - Enable AI to control your browser&#xff0c;专为大语言模型&#xff08;LLM&#xff09;设计的只能浏览器工具&#xff0c;能够让AI像人类一样自然的浏览和操作网页&#xff0c;支持多标签页管…...

【题解-洛谷】B4302 [蓝桥杯青少年组省赛 2024] 出现奇数次的数

题目:B4302 [蓝桥杯青少年组省赛 2024] 出现奇数次的数 题目描述 奇数:指不能被 2 2 2 整除的整数。 例如: 3 3...

Redis SETNX:分布式锁与原子性操作的核心

SETNX 是 Redis 中的一个经典命令&#xff0c;全称是 Set if Not eXists&#xff08;当键不存在时设置值&#xff09;。它的核心作用是原子性地完成 “检查并设置” 操作&#xff0c;常用于分布式锁、防止重复提交等需要 “独占性” 的场景。 一、基本语法与返回值 命令格式&…...

常见字符串相似度算法详解

目录 引言 一、Levenshtein距离&#xff08;编辑距离&#xff09; 1.1 算法原理 1.2 Java实现 1.3 springboot中实现 二、Jaro-Winkler相似度 2.1 算法特点 2.2 Java实现 三、余弦相似度&#xff08;向量空间模型&#xff09; 3.1 实现步骤 3.2 Java实现 3.3 简化版…...

红蓝对抗中的网络安全设备操作手册

目录 &#x1f510; 关键要点 设备操作与实战应用 &#x1f4ca; 1. 防火墙 (Firewall) 蓝队&#xff08;防御&#xff09;用法 红队&#xff08;攻击&#xff09;用法 &#x1f50d; 2. 入侵检测/防护系统 (IDS/IPS) 蓝队&#xff08;防御&#xff09;用法 红队&#…...

用python实现汉字转拼音工具

用python实现汉字转拼音工具 主要功能特点&#xff1a; 多种拼音风格选择&#xff08;带声调符号、数字声调、无声调&#xff09;输出模式&#xff1a;可以选择“普通模式”&#xff08;仅拼音&#xff09;或“拼音注音”&#xff08;每个汉字的拼音显示在上方&#xff09;可…...

spring中的Interceptor使用说明

一、Interceptor 的核心概念 Interceptor&#xff08;拦截器&#xff09; 是 Spring MVC 提供的一种机制&#xff0c;用于在请求处理的不同阶段插入自定义逻辑。其核心作用包括&#xff1a; • 预处理&#xff1a;在控制器方法执行前进行权限校验、日志记录等。 • 后处理&am…...

Wi-Fi(无线局域网技术)

Wi-Fi&#xff08;Wireless Fidelity&#xff0c;无线保真&#xff09;是通过无线电波传输数据的技术&#xff0c;它使设备能够通过无线连接方式访问网络、共享文件或连接互联网。Wi-Fi已经成为现代家庭、办公室以及公共场所中常见的无线通信方式&#xff0c;支持的设备包括手机…...

MySQL Host 被封锁解决方案(全版本适用 + Java 后端优化)

引言 MySQL 中 “Host is blocked because of many connection errors” 是生产环境常见问题&#xff0c;若处理不当会导致服务中断。本文结合 MySQL 官方文档&#xff08;5.5/8.0&#xff09;、Java 后端最佳实践及企业级经验&#xff0c;提供从 “快速解封” 到 “根源优化”…...

分类预测 | Matlab实现PSO-RF粒子群算法优化随机森林多特征分类预测

分类预测 | Matlab实现PSO-RF粒子群算法优化随机森林多特征分类预测 目录 分类预测 | Matlab实现PSO-RF粒子群算法优化随机森林多特征分类预测分类效果**功能概述****算法流程** 分类效果 功能概述 数据预处理 读取Excel数据集&#xff0c;划分训练集&#xff08;前260行&#…...

【苍穹外卖】Day01—Mac前端环境搭建

目录 一、安装Nginx &#xff08;一&#xff09;安装Homebrew &#xff08;二&#xff09;Homebrew安装Nginx 1. 执行安装命令&#xff1a; 2. 验证安装&#xff1a; &#xff08;三&#xff09;启动与停止Nginx 二、配置Nginx 1. 替换nginx.conf 2. 替换html文件夹 三…...

anaconda创建环境出错HTTPS

报错信息 warnings.warn( /home/ti-3/anaconda3/lib/python3.12/site-packages/urllib3/connectionpool.py:1099: InsecureRequestWarning: Unverified HTTPS request is being made to host ‘repo.anaconda.com’. Adding certificate verification is strongly advised. Se…...

Nginx 强制 HTTPS:提升网站安全性的关键一步

在当今互联网时代&#xff0c;网站的安全性至关重要。使用 HTTPS 协议可以有效保护用户数据&#xff0c;防止信息泄露和中间人攻击。本文将详细介绍如何在 Nginx 中设置强制 HTTPS&#xff0c;确保所有 HTTP 请求都被自动重定向到 HTTPS。 一、背景与重要性 HTTPS&#xff08…...

青藏高原边界数据总集

关键数据集分类&#xff1a;地表参数数据集空间分辨率&#xff1a;m共享方式&#xff1a;开放获取数据大小&#xff1a;265.87 KB数据时间范围:2016元数据更新时间:2022-04-18 数据集摘要 此边界数据总集包含五种类型的边界&#xff1a; &#xff11;、TPBoundary_2500m&#…...