基于 Redis Stream 实现消息队列功能
好长时间没更新了。。。。。。
背景:举个例子在某个接口执行完成后只需要前半段返回结果,后半段可能是日志记录、下游系统调用等功能的情况下,将耗时的消息进行异步发送就显得很有必要,这时就有很多种选择,单体项目甚至可以选择自定义线程池+DelayQueue 这种操作去进行异步操作,而大多数人会在第一时间想到消息丢列,但是消息引入消息队列这件事对于一个并发量不大、后半段消息允许失败的情况单独引入一个中间件对系统的开发维护难度都会提升一个等级,所以我就想到应用 Redis Stream 这种方式来实现异步任务的执行
废话不多说,直接上代码
@Component
@Slf4j
@RequiredArgsConstructor
public class RedisCountdownTaskProducer implements CountdownStrategy {private final StringRedisTemplate stringRedisTemplate;@Overridepublic void startCountdown(long duration, Runnable onFinish, String userId, String taskId) {log.info("使用redis stream 的延时任务开始执行 userId:{},taskId:{}",userId,taskId);Map<String,String> producerMap = Map.of("userId",userId,"taskId",taskId,"duration",String.valueOf(duration));// 发送延迟消费信息 topic: pickUpTheLightRecordstringRedisTemplate.opsForStream().add("streamKey", producerMap);}@Overridepublic void cancelCountdown(String userId, String taskId) {}
}
以我的应用场景为例,大家可以忽略这个继承的 CountdownStrategy 的接口,我这是用策略模式来实现多种方式的动态切换
最核心的代码就是一行 stringRedisTemplate.opsForStream().add(“pickUpTheLightRecord”, producerMap);
应用 redis 提供的 stream 功能,直接发送你的 topic 和你的 key(这个 key 可以是你的某个实体,某个信息,或者说某种标识,以便后续取出的时候可以知道自己要进行什么操作)
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {private final RedisConnectionFactory redisConnectionFactory;private final RedisCountdownTaskConsumer redisCountdownTaskConsumer;@Beanpublic ExecutorService asyncStreamConsumer() {AtomicInteger index = new AtomicInteger();int processors = Runtime.getRuntime().availableProcessors();return new ThreadPoolExecutor(processors,processors + processors >> 1,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),runnable -> {Thread thread = new Thread(runnable);thread.setName("stream_consumer_countdown_task_" + index.incrementAndGet());thread.setDaemon(true);return thread;});}@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(ExecutorService asyncStreamConsumer) {// 创建消费者组String consumerGroup = "your_comsumer_group";String streamKey = "streamKey"; // 与生产者topic一致try {redisConnectionFactory.getConnection().xGroupCreate(streamKey.getBytes(), consumerGroup, ReadOffset.from("0-0"), true);} catch (Exception e) {// 捕获异常,检查是否是因为消费者组已存在导致的错误if (e.getMessage().contains("BUSYGROUP")) {// 如果消费者组已存在,则复用现有的消费者组log.warn("消费者组已存在,复用现有的消费者组: {}", consumerGroup);} else {// 如果是其他错误,则记录日志log.warn("消费者组创建失败: {}", e.getMessage());}}StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(10)// 执行从 Stream 拉取到消息的任务流程.executor(asyncStreamConsumer)// 如果没有拉取到消息,需要阻塞的时间。不能大于 ${spring.data.redis.timeout},否则会超时.pollTimeout(Duration.ofSeconds(3)).build();StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =StreamMessageListenerContainer.create(redisConnectionFactory, options);streamMessageListenerContainer.receiveAutoAck(Consumer.from(consumerGroup,"countdownTaskConsumer"),StreamOffset.create(streamKey, ReadOffset.lastConsumed()),redisCountdownTaskConsumer);return streamMessageListenerContainer;}
}
以上是对消费者的配置,配置过后我们通过 Bean 的形式直接注入 Spring 容器,方便在应用启动时它可以自动创建,应用结束时可以自动销毁,避免资源浪费
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisCountdownTaskConsumer implements StreamListener<String, MapRecord<String, String, String>> {private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;private final TaskCommonPomodoroTechniqueMapper taskCommonPomodoroTechniqueMapper;private final StringRedisTemplate stringRedisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {log.info("receive message:{},倒计时任务开始执行 userId:{},taskId:{}",message);// 执行倒计时任务String stream = message.getStream();RecordId id = message.getId();String consumerGroup = "pickUpTheLightRecord_consumer_group";//消息幂等处理if(!messageQueueIdempotentHandler.isMessageProcessed(id.toString())){//判断当前消息是否执行完成if(messageQueueIdempotentHandler.isAccomplish(id.toString())){log.info("消息已处理完成: {}", id);stringRedisTemplate.opsForStream().acknowledge(stream, consumerGroup, id); // 显式ACreturn;}throw new RuntimeException("消息未完成流程,选择消息队列重试");}try {Map<String, String> value = message.getValue();String userId = value.get("userId");String taskId = value.get("taskId");LambdaUpdateWrapper<TaskCommonPomodoroTechnique> wrapper = new LambdaUpdateWrapper<>();wrapper.eq(TaskCommonPomodoroTechnique::getUserId,userId);wrapper.eq(TaskCommonPomodoroTechnique::getId,taskId);TaskCommonPomodoroTechnique taskCommonPomodoroTechnique = taskCommonPomodoroTechniqueMapper.selectOne(wrapper);if(taskCommonPomodoroTechnique == null){log.error("倒计时任务不存在,userId:{},taskId:{}",userId,taskId);return;}taskCommonPomodoroTechnique.setCompletionTimes(taskCommonPomodoroTechnique.getCompletionTimes()+1);taskCommonPomodoroTechniqueMapper.updateById(taskCommonPomodoroTechnique);log.info("倒计时任务执行成功,userId:{},taskId:{}",userId,taskId);stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());}catch (Throwable e){messageQueueIdempotentHandler.deleteMessageProcessed(id.toString());log.error("记录倒计时任务异常",e);throw e;}messageQueueIdempotentHandler.setAccomplish(id.toString());}
}
最后我们来实现具体的消费者,直接继承 StreamListener 这个类,重写 onMessage 方法,在这个方法中定义你要执行的业务逻辑(大家可以忽略幂等的处理,这个不是讲解的重点)
这样就可以使用基于 Redis Stream 的消息丢列啦
总的来说和 RocketMQ 的使用差不多,但是多了一些配置的过程
相关文章:
基于 Redis Stream 实现消息队列功能
好长时间没更新了。。。。。。 背景:举个例子在某个接口执行完成后只需要前半段返回结果,后半段可能是日志记录、下游系统调用等功能的情况下,将耗时的消息进行异步发送就显得很有必要,这时就有很多种选择,单体项目甚至…...
单元测试、系统测试、集成测试、回归测试的步骤、优点、缺点、注意点梳理说明
单元测试、系统测试、集成测试、回归测试的梳理说明 单元测试 步骤: 编写测试用例,覆盖代码的各个分支和边界条件。使用测试框架(如JUnit、NUnit)执行测试。检查测试结果,确保代码按预期运行。修复发现的缺陷并重新测…...
深入理解 HTML 中的<div>和元素:构建网页结构与样式的基石
一、引言 在 HTML 的世界里,<div>和元素虽看似普通,却扮演着极为关键的角色。它们就像网页搭建过程中的万能积木,能够将各种 HTML 元素巧妙地组合起来,无论是构建页面布局,还是对局部内容进行样式调整ÿ…...
网络安全信息收集[web子目录]:dirsearch子目录爆破全攻略以及爆破字典结合
目录 一、dirsearch 工具详细使用攻略 1. 安装 前提条件 安装步骤 可选:直接下载预编译版本 2. 基本用法 命令格式 参数说明 示例 3. 核心功能与高级用法 3.1 多线程加速 3.2 自定义字典 3.3 递归扫描 3.4 过滤响应 3.5 添加请求头 3.6 代理支持 3…...
Mybaties批量操作
1、批量插入 <!--批量操作-插入--><!-- 相当于INSERT INTO t_goods (c1,c2,c3) VALUES (a1,a2,a3),(b1,b2,b3),(d1,d2,d3),...--><insert id"batchInsert" parameterType"java.util.List">INSERT INTO t_goods (title,sub_title,origina…...
27.卷2的答案
CSP-J离我们不远了,加加油啦! 1.堆排序最坏时间复杂度是? 解析:平时多多练习可知,最坏时间复杂度是O(n log n)。 2.哪条能将s中的数值保留一位,并将第二位四舍五入? 解析:经过试…...
【 Manus平替开源项目】
文章目录 Manus平替开源项目1 OpenManus1.1 简介1.2 安装教程1.3 运行 2 OWL2.1 简介2.2 安装教程2.3 运行 3 OpenHands(原OpenDevin)3.1 简介3.2 安装教程和运行 Manus平替开源项目 1 OpenManus 1.1 简介 开发团队: MetaGPT 核心贡献者(5…...
【WEB APIs】DOM-事件基础
目录 1. 事件监听(绑定) 案例—关闭广告 案例-随机点名 2. 事件类型 2.1 鼠标事件 2.2 焦点事件 2.3 文本事件 3. 事件对象 案例—评论回车发布 4. 环境对象 5. 回调函数 6. 综合案例—tab栏切换 1. 事件监听(绑定) …...
66.Harmonyos NEXT 图片预览组件使用指南
温馨提示:本篇博客的详细代码已发布到 git : https://gitcode.com/nutpi/HarmonyosNext 可以下载运行哦! Harmonyos NEXT 图片预览组件使用指南 文章目录 Harmonyos NEXT 图片预览组件使用指南效果预览一、组件使用概述1. 组件功能特点2. 组件依赖关系 二…...
linux系统安装和激活conda
安装 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.shbash ./Miniconda3-latest-Linux-x86_64.sh回车到最后按照输入yes,之后按提示操作。 激活 conda activate如果没有反应或者返回: bash: conda: command not found则…...
Java 集合框架大师课:集合框架的暗黑料理(六)
🔮Java 集合框架大师课:集合框架的暗黑料理(六)——弱引用与幽灵队列 第一章 弱引用:Java世界的塑料兄弟情 💔 四大引用类型生死簿 // 四类引用生死实验 Object strongObj new Object(); …...
LVI-SAM、VINS-Mono、LIO-SAM算法的阅读参考和m2dgr数据集上的复现(留作学习使用)
ROS一键安装参考: ROS的最简单安装——鱼香一键安装_鱼香ros一键安装-CSDN博客 opencv官网下载4.2.0参考:https://opencv.org/releases/page/3/ nvidia驱动安装:ubuntu18.04 安装显卡驱动 - 开始战斗 - 博客园 cuda搭配使用12 cuda安装1:Ub…...
京鲁医疗健康专家委员会聊城专家团成立
3月13日由京鲁医疗健康专家委员会指导,聊城市委人才工作领导小组办公室、聊城市卫生健康委员会、聊城市人才引进服务中心主办的"智链医脉.新启未来"聊城卫生健康产才共融发展交流会在北京人卫酒店召开。会上,京鲁医疗健康专家委员会…...
MySQL的事务机制
事务 事务概念:事务是一个完整的操作单元,不可分割,事务中的操作要么全部成功,要么全部失败。 1. 事务特性 ACID 1.1 原子性(A) 一个事务中所有操作是不能被分割的,要么所有的操作都成功&am…...
30、Vuex 为啥可以进行缓存处理
Vuex 状态管理基础与缓存的关联 Vuex 的核心概念: Vuex 主要由五个部分组成:state、mutations、actions、getters和modules。其中,state是存储数据的地方,类似于一个全局的数据仓库。在这个菜谱 APP 的例子中,缓存的数…...
OpenAI Agents SDK 中文文档 中文教程 (5)
英文文档原文详见 OpenAI Agents SDKhttps://openai.github.io/openai-agents-python/ 本文是OpenAI-agents-sdk-python使用翻译软件翻译后的中文文档/教程。分多个帖子发布,帖子的目录如下: (1) OpenAI 代理 SDK, 介绍及快速入门 (2)Open…...
如何处理PHP中的文件上传错误
如何处理PHP中的文件上传错误 在Web开发中,文件上传是一个常见的功能需求。然而,文件上传过程中可能会遇到各种错误,如文件大小超出限制、文件类型不被允许、上传过程中断等。为了确保用户能够顺利上传文件,并且开发者能够有效地…...
mac安装python没有环境变量怎么办?zsh: command not found: python
在mac电脑上,下载Python安装包进行安装之后,在终端中,输入python提示: zsh: command not found: python 一、原因分析 首先,这个问题不是因为python没有安装成功的原因,是因为python安装的时候,没有为我们添加环境变量导致的,所以我们只需要,在.zshrc配置文件中加上环…...
湿大气校正效应
目的 修正由于大气中的水汽对雷达波传播速度的影响,以提高海面高度测量的准确性,有时候也叫做对流层校正。水汽的时空变化复杂,难以直接通过气象模型准确预测。水汽的折射作用使雷达信号的传播速度减慢,从而导致测量的海面高度虚增…...
算法系列之回溯算法求解数独及所有可能解
有没有对数独感兴趣的朋友呢?数独作为一款经典的逻辑游戏,其目标是在一个9x9的方格中填入数字1至9,确保每一行、每一列以及每一个3x3的子网格中都包含这些数字且不重复。尽管数独的规则看似简单,但编写一个能够自动求解数独的程序…...
动态路径规划——01背包问题讲解和通过滚动数组优化
如果没有动态路径规划基础的兄弟可以出去了,这个题目有两个问题 第一问讲解: 1.定义状态表示 刚开始我做的时候根据我的经验定义了一个状态表示dp[i]表示从1到i个物品中选择的最大价值,但是这个状态表示有一个明显的问题,我怎么知…...
蓝队基本技能 web入侵指南 日志分析 后门查杀 流量分析
前言 为了赶工我是没学过红队的,首先我们要做的是 1、拿到用户给的web的时候 要先知道 web的源码 服务器 中间件 数据库这些信息 2、知道web日志放在哪里 会一些基本的分析 3、webshell查杀的基本技能 4、会分析基本的工具链 会写报告 .NET IIS 配置…...
docker基本应用和相关指令
文章目录 概要镜像管理容器操作网络管理数据卷管理其他常用指令典型场景示例小结 概要 Docker的命令通常分为几个大类,比如镜像管理(images)、容器管理(containers)、网络(network)、数据卷&…...
Django REST Framework中的序列化器类和视图类
序列化器类 一、Serializer序列化类 Serializer是DRF的序列化器基类,提供基本功能,使用Serializer类需要自己定义字段名称和类型。 BookSerializer(Serializer):name serializers.CharField()price serlializers.IntegerField()date serlializers.…...
模拟人生4大型MOD整合包3000+
存档介绍 (懒人萌新必备) 游戏内全面的人物美化、房屋改造、地图美化 美化人物250个(颜值在线,均搭配八套服饰) 全地图房屋改造(住宅、公寓、公用/商业地段等) 游戏内22张地图均已美化替换 存档…...
算法基础 -- Brian Kernighan 算法初识
Brian Kernighan 算法:利用 x & (x - 1) 逐步清除最低位的 1 1. 算法原理 x & (x - 1) 这个操作的作用是每次清除 x 的最低位的 1。由于 二进制的减法 会影响最低的 1,我们可以利用这一特性不断去除 1,直到 x 变为 0,从…...
基于Uniapp开发tab选项卡/标签栏前端组件
在开发一些业务场景时候,可能需要切换标签栏来展示不同的信息列表。 为此开发了一个Uniapp组件(myTab),下面为组件的展示效果: 案例代码: <template><view class"content"><myt…...
双向广搜
从两侧同时展开,那测数据少就从哪侧展,两者展开结果出现一样的,返回答案 127. 单词接龙 - 力扣(LeetCode) class Solution { public:int ladderLength(string beginWord, string endWord, vector<string>& wordList) {unordered_set<stri…...
2024下半年真题 系统架构设计师 论文写作 答案解析
系统架构设计师第二版VIP课程https://edu.csdn.net/course/detail/40283 试题一 论软件维护及其应用 请围绕“论软件维护及其应用”论题,依次从以下三个方面进行论述。 1.概要叙述你参与分析设计的软件项目以及你在其中所承担的主要工作。 2.请介绍软件维护的内…...
【VSCODE 插件 可视化】:SVG 编辑插件 SVG Editor
插件下载 svgeditor 创建文件 Windows/Linux 快捷键 Ctrl Shift P 打开VSCODE 命令面板查找 New File With Svg Editor 编辑文件 保存文件 打开文件以继续编辑 CG 选中多个:shift单击没找到横向分布功能无法用键盘微调位置...
go中实现子模块调用main包中函数的方法
你提到的“import cycle not allowed”错误是 Go 语言中一个常见的问题,表示在包的导入中存在循环依赖。在 Go 中,一个包不能直接或间接导入自己,否则就会报这个错误。 在你提到的第二个例子中,main 包和 submodule 包相互导入&a…...
VUE的脚手架搭建引入类库
VUE的小白脚手架搭建 真的好久好久自己没有发布自己博客了,对于一直在做后端开发的我 ,由于社会卷啊卷只好学习下怎么搭建前端,一起学习成长吧~哈哈哈(最终目的,能够懂并简易开发) 文章目录 VUE的小白脚手架搭建1.下载node.js2.安装vue脚手架3.创建一个项目4.代码规范约束配置(…...
java maven依赖传递以及版本冲突
文章目录 基本背景基本排查冲突工具依赖传递:很多依赖到底使用哪个版本的依赖dependencyManagement 作用exclusions其他问题 基本背景 你使用 java,使用 maven pom.xml 管理你的依赖包 可能常常遇到依赖版本冲突,或者很多依赖包,…...
【3-14 STC-pair超级详细的解说】
1. pair的定义和结构 • 基础概念:考察对std::pair模板类的理解,包括其头文件(<utility>)和基本语法(pair<T1, T2>)。 • 成员访问:测试对first和second成员变量的使用能力。 • 构…...
Python开发合并多个PDF文件
前言 在我们的工作中,可能有以下场景需要用到合并多个PDF: 文档归档:在企业或组织中,常常需要将相关的文档(如合同、报告、发票等)合并为一个PDF文件,以便于归档和管理。 报告生成:在…...
基于SpringBoot + Vue 的房屋租赁系统
基于springboot的房屋租赁管理系统-带万字文档 SpringBootVue房屋租赁管理系统 送文档 本项目有前台和后台两部分、多角色模块、不同角色权限不一样 共分三种角色:用户、管理员、房东 管理员:个人中心、房屋类型管理、房屋信息管理、预约看房管理、合…...
Qt QML实现弹球消砖块小游戏
前言 弹球消砖块游戏想必大家都玩过,很简单的小游戏,通过移动挡板反弹下落的小球,然后撞击砖块将其消除。本文使用QML来简单实现这个小游戏。 效果图: 正文 代码目录结构如下: 首先是小球部分,逻辑比较麻…...
ROS实践(四)机器人SLAM建图(gmapping)
目录 一、SLAM技术 二、常用工具和传感器 三、相关功能包 1. gmapping建图功能包 2. map_server 四、SLAM 建图实验 1. 配置gmapping(launch文件) 2. 启动机器人仿真(含机器人以及传感器) 3. 运行gmapping节点 4. 启动rviz可视化工具 5. 保存地图文件 一、SLAM技…...
使用Arduino、ESP8266和GPS在Google地图上追踪车辆
使用 ESP8266、GPS 和 Google 地图的 Arduino Vehicle Tracker 如今,车辆跟踪系统变得非常重要,尤其是在车辆被盗的情况下。如果您的车辆安装了 GPS 系统,您可以跟踪您的车辆位置,它可以帮助警方追踪被盗车辆。 在这里,我们正在构建更高级版本的车辆跟踪系统,您可以在其…...
python数据分析文件夹篇--pandas,openpyxl,xlwings三种方法批量创建、 复制、删除工作表
前言 之前我们学习了使用pandas灵活地读取数据,包括读取工作簿中一个或几个工作表,读取工作表中一行或多行,一列或多列数据,还学习了如何将数据灵活保存到本地。 今天我们学习一下文件夹中文件的批量处理操作,包括批量…...
MyBatis 的一级、二级缓存
文章目录 1️⃣ 一级缓存(Local Cache)📌 定义🚀 示例代码 2️⃣ 二级缓存(Global Cache)📌 定义🚀 使用方式 3️⃣ 一级缓存 vs. 二级缓存 📊4️⃣ 数据共享问题&#x…...
掌握市场先机:9款销售渠道管理工具深度测评
本文主要介绍了以下9款销售渠道管理工具:1.纷享销客; 2.销帮帮; 3.小满CRM; 4.有赞; 5.Oracle NetSuite; 6.Salesforce Sales Cloud; 7.Cin7; 8.Pipedrive; 9.BigCommerc…...
【计算机网络通信 MQTT和AMQP的原理及应用场景、优缺点】
MQTT(Message Queuing Telemetry Transport)和AMQP(Advanced Message Queuing Protocol)都是常用的消息中间件协议,以下是它们的原理、应用场景、优缺点介绍: 原理 MQTT 基于发布/订阅模式,有…...
如何搭建一个适配微信小程序,h5,app的uni-app项目
在vscode搭建 uni-app 项目(Vue 3 Vite Pinia uView Plus) 一、环境准备 1. 安装 Node.js 确保已安装 Node.js(需≥14版本),可通过以下命令检查版本: node -v2. 安装 VSCode 从 VSCode 官网 下载并…...
3.14学习总结 排序算法
插入排序: 1.直接插入排序 维护一个有序区,把元素一个个插入有序区的适当位置,直到所有元素都有序为止。 for (int i 0;i < n - 1;i) {//升序int end i;int temp k[end 1];while (end > 0) {if (temp < k[end]) {k[end 1] …...
模拟类似 DeepSeek 的对话
以下是一个完整的 JavaScript 数据流式获取实现方案,模拟类似 DeepSeek 的对话式逐段返回效果。包含前端实现、后端模拟和详细注释: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><titl…...
实用小工具——快速获取数据库时间写法
最近我遇到了一个比较棘手的问题:在工作中,各个项目所使用的数据库类型各不相同。这导致我习惯性地使用Oracle的SQL语句进行编写,但每次完成后都会遇到报错,最终才意识到项目的数据库并非Oracle。为了避免这种情况,我需…...
完善机器人:让 DeepSeek 使用Vue Element UI快速搭建 AI 交互页面
在前两篇文章中,我们已经使用 AI 生成了 Java API,并创建了一个简单的 HTML JavaScript 网页,让用户可以与 AI 机器人聊天。但如果我们想要一个更美观、更专业的交互界面,该怎么办呢?🤔 本篇文章…...
PC端QT实现mqtt客户端发布和订阅
在Windows11-64位系统下使用QT开发桌面应用程序,实现mqtt客户端的发布和订阅功能。 需求: mqtt代理服务器 --mosquitto; mqtt客户端工具 -- mqtt.fx; qtcreator开发工具 -- qtcreator6.8.2版本; 过程:…...
蓝桥云客 挖矿
0挖矿 - 蓝桥云课 问题描述 小蓝正在数轴上挖矿,数轴上一共有 n 个矿洞,第 i 个矿洞的坐标为 ai。小蓝从 0 出发,每次可以向左或向右移动 1 的距离,当路过一个矿洞时,就会进行挖矿作业,获得 1 单位矿石&…...