【Rust基础】使用Rocket构建基于SSE的流式回复
背景
我们正在使用Rust开发基于RAG的知识库系统,其中对于模型的回复使用了常用的SSE,Web框架使用Rocket,Rocket提供了一个简单的方式支持SSE,但没有会话保持、会话恢复等功能,因此我们自己简单实现这两个功能。
使用Rocket推送消息流
以下,是Rocket给出的示例:
#[get("/text/stream")]
fn stream() -> TextStream![String] {TextStream! {let mut interval = time::interval(Duration::from_secs(1));for i in 0..10 {yield format!("n: {}", i);interval.tick().await;}}
}
我们需要改造这个示例,以满足将模型回复的消息推送给前端的需求。
首先,对于既然推流,需要知道将流推送给谁,也就是要推送到哪个会话中,所以我们在发起会话的时候,需要一个会话ID来标识一个唯一的会话。
我们使用sse.js这个库作为SSE的客户端,用于发起SSE连接,该库可以通过发起POST请求来建立连接,可以携带额外的数据和请求头。
使用以下结构来接收一个对话请求:
pub struct ChatMessageReq {/// 会话IDpub session_id: String,/// 消息内容pub content: String,
}
于是我们的接口需要修改为这样:
#[post("/chat", data = "<req>")]
async fn question(req: Json<ChatMessageReq>) -> (ContentType, TextStream<impl Stream<Item = String>>)
{//TODO
}
其中TextStream<impl Stream<Item = String>>
等价于TextStream![String]
。
需要注意的是,如果返回值没有指定ContentType
,那么Rocket默认响应的ContentType是文本类型,而非stream类型,会导致前端无法解析。
接下来我们实现会话管理功能。
我们定义了一个名为SsePool的struct,来存储并管理SSE连接:
struct SsePool {/// 消息流传输通道channel: DashMap<String, Sender<SseEvent>>,
}impl SsePool {/// 初始化连接池pub fn init() -> Self {Self {channel: DashMap::new(),}}/// 移除连接fn remove(&self, id: &String) {if let Some((_, sender)) = self.channel.remove(id) {drop(sender);}}/// 获取连接fn get_sender(&self, id: &String) -> Option<Sender<SseEvent>> {self.channel.get(id).map(|v| v.value().clone())}/// 新建channelpub fn new_channel(&self, id: String) -> (Sender<SseEvent>, Receiver<SseEvent>) {let (sender, receiver) = tokio::sync::mpsc::channel(10_0000);// 获取并移除旧senderlet old_sender = self.channel.remove(&id).map(|(_, s)| s);// 插入新senderself.channel.insert(id, sender.clone());// 处理旧senderif let Some(old_sender) = old_sender {tokio::spawn(async move {// 发送终止信号let _ = old_sender.send(SseEvent::Abort).await;});}(sender, receiver)}/// 发送消息pub async fn send_message(&self, id: &String, message: ChatMessage) {if let Some(sender) = self.get_sender(id) {if let Err(e) = sender.send(SseEvent::ChatMessage(message)).await {log::warn!("消息发送失败,session id: {},失败原因:{}", &id, e);};// drop(sender);}}
}
其中channel使用的是tokio中mpsc的channel。
值得注意的是,new_channel
中,新建连接时,需要向channel发送一条终止事件,确保已有的receiver关闭,返回新的receiver,这一点是用于后续的会话恢复使用。new_channel
会返回receiver和sender,用于消息接收和发送。
当收到模型回复是,调用SsePool::send_message
发送消息到channel,再头通过receiver接收消息,转发到前端。
可以把它初始化到静态变量中,方便全局调用:
static SSE_POOL: LazyLock<SsePool> = LazyLock::new(|| SsePool::init());
于是,我们的接口可以完善为以下内容:
#[post("/chat", data = "<req>")]
async fn question(req: Json<ChatMessageReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>){// 请求新消息,并返回receiverlet (_, _, mut receiver) = service::new_message(req).await.unwrap();let stream = TextStream! {// 持续接收receiver的消息,然后推送到前端while let Some(item) = receiver.recv().await {match item{//模型回复的消息SseEvent::ChatMessage(message) => {yield SseEvent::ChatMessage(message.clone()).to_message();if SseEvent::is_done(&message) {// 推送消息yield SseEvent::Abort.to_message();break;}},// 关闭通道SseEvent::Abort => {yield SseEvent::Abort.to_message();break;},_ => {}}}yield SseEvent::Abort.to_message();drop(receiver);};(ContentType::EventStream, stream)
}
至此,新会话的接口就完成了。
接下来是会话的恢复。
当前端切换会话或刷新页面时,我们希望能够继续收到未回复完成的消息,所以需要一个用于会话恢复的接口。同样的,接口需要会话ID来区分恢复哪一个会话。
#[post("/resume-stream", data = "<req>")]
async fn resume_stream(req: Json<ResumeStreamReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>){// 会话IDlet session_id = req.session_id.clone();let stream = TextStream! {// 尝试恢复会话,并返回receiver,如果能够返回receiver说明会话未完成,否则已经完成if let Some(mut receiver) = service::resume_stream(&req.session_id).await.unwrap(){// 持续接收未回复完成的消息while let Some(item) = receiver.recv().await {match item {// 模型回复的消息SseEvent::ChatMessage(message) => {yield SseEvent::ChatMessage(message.clone()).to_message();if SseEvent::is_done(&message) {yield SseEvent::Abort.to_message();break;}}// 关闭通道SseEvent::Abort => {yield SseEvent::Abort.to_message();break;}_ => {}}}drop(receiver);}yield SseEvent::Abort.to_message();};(ContentType::EventStream, stream)
}
在service::resume_stream
中,首先检查对应会话ID的channel是否存在,存在则新建channel返回receiver,不存在则表明已经回复完成。
pub async fn resume_stream(session_id: &String,
) -> AppResult<Option<Receiver<SseEvent>>> {if let None = chat::get_connection(session_id) {return Ok(None);}// 获取会话对应的channel,如果channel存在则标识消息仍在回复中let (_, receiver) = chat::new_connection(session_id);Ok(Some(receiver))
}
至此,便实现了会话恢复,刷新页面后仍能后接收strem消息。
总结
使用Rust写这些业务代码的速度,终归是没有Java快,一些常用的库,没有Java系列封装的简单易用,不过应用占用资源确实比Java小很多。
本次使用的一些库:
- tokio:异步运行环境,以及mpsc的channel,
- dashmap:支持并发的hashmap,但是使用不当容易造成死锁。
相关文章:
【Rust基础】使用Rocket构建基于SSE的流式回复
背景 我们正在使用Rust开发基于RAG的知识库系统,其中对于模型的回复使用了常用的SSE,Web框架使用Rocket,Rocket提供了一个简单的方式支持SSE,但没有会话保持、会话恢复等功能,因此我们自己简单实现这两个功能。 使用R…...
一种改进的CFAR算法用于目标检测(解决多目标掩蔽)
摘要 恒虚警率(CFAR)技术在雷达自动检测过程中起着关键作用。单元平均(CA)CFAR算法在几乎所有的多目标情况下都会受到掩蔽效应的影响。最小单元平均(SOCA)CFAR算法仅当干扰目标位于参考窗口的前后方时才具有…...
什么是人工智能芯片?
行业专家指出,许多智能设备和物联网设备都是由某种形式的人工智能(AI)驱动的——无论是语音助理、面部识别摄像头,还是电脑。这些设备需要采用某种技术为它们进行的数据处理提供支持。有些设备需要在云平台的大型数据中心处理数据,而也有一些…...
0.深入探秘 Rust Web 框架 Axum
在当今的 Web 开发领域,Rust 凭借其出色的性能、内存安全性和并发处理能力,正逐渐崭露头角。而 Axum 作为 Rust 生态系统中一款备受瞩目的 Web 框架,更是为开发者提供了高效、灵活且强大的工具,用于构建现代化的 Web 应用程序。本…...
深度监听 ref 和 reactive 的区别详解
深度监听 ref 和 reactive 的区别详解 一、ref 的深度监听(示例代码)关键点:1. ref 的存储方式:2. 监听 ref 的特性 二、reactive 的深度监听(示例代码)关键点:1. reactive 的深度响应性2. 监听…...
面向对象—有理数类的设计
目录 1.代码呈现 1.1编写toString、equals方法 1.2测试代码 1.3有理数类的代码 2.论述题 3.有理类设计 1.代码呈现 1.1编写toString、equals方法 (1)toString方法 Overridepublic String toString(){if(this.v20){return "Undefined";}return this.v1 "/…...
OpenHarmony Camera开发指导(四):相机会话管理(ArkTS)
概述 相机在使用预览、拍照、录像、获取元数据等功能前,都需要先创建相机会话。 相机会话Session的功能如下: 配置相机的输入流和输出流。 配置输入流即添加设备输入,通俗来讲即选择某一个摄像头进行拍照录像;配置输出流&#x…...
Linux电源管理(三),CPUIdle 和 ARM的PSCI
更多linux系统电源管理相关的内容请看:Linux电源管理、功耗管理 和 发热管理 (CPUFreq、CPUIdle、RPM、thermal、睡眠 和 唤醒)-CSDN博客 1 简介 Linux下的空闲进程cpuidle在内核中是一个子系统。cpuidle子系统所需要做的事情就是在CPU进入idle状态后,…...
【测试工具】JMeter使用小记
JMeter 使用小记 下载与安装 jdk 下载地址:https://www.oracle.com/java/technologies/downloads/#jdk18-windowsJMeter 下载地址:https://jmeter.apache.org/download_jmeter.cgi 教程参考:JMeter下载及安装详细教程-CSDN博客 设置中文界…...
Obsidian的简单使用
一、安装并配置仓耳今楷字体 优化阅读体验,个人实测觉得正文用 仓耳今楷04-W03最合适(前面的数字代表字体,数字越大,越偏向于楷体,而01就很像黑体。后面的数字代表粗细,正常粗细是W03,最粗是W0…...
docker的基础知识
Docker https://www.yuque.com/leifengyang/sutong 下载镜像 检索: docker search下载: docker pull列表: docker images删除 docker rmi启动容器 运行: docker run查看: docker ps停止: docker stop启动: …...
PcVue助力立讯:精密制造的智能化管控实践!
PcVue助力立讯: 精密制造的智能化管控实践! 客户介绍 立讯精密(Luxshare ICT,股票代码:002475)成立于2004年5月24日,专注于为消费电子产品、汽车领域产品以及企业通讯产品提供从核心零部件、…...
深度学习-157-Dify工具之创建知识库
文章目录 1 硅基流动1.1 模型广场1.1.1 对话模型(免费)1.1.2 嵌入模型(免费)1.1.3 重排序模型(免费)1.2 模型调用1.2.1 文本对话1.2.2 文本嵌入2 构建知识库2.1 准备文档2.2 点击创建知识库2.3 设置嵌入参数2.4 召回测试3 创建聊天助手3.1 仅使用大模型3.2 结合知识库的大模型3…...
Oracle--安装Oracle Database23ai Free
前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除 官方文档: Get Started with Oracle Database 23ai | Oracle 一、安装的环境要求 本文同步使用Oracle Linux9的虚拟机进行操作 1、Orac…...
【JavaEE初阶】多线程重点知识以及常考的面试题-多线程进阶(三)
本篇博客给大家带来的是集合类在多线程下的使用和死锁的知识点还包括常见的面试题. 🐎文章专栏: JavaEE初阶 🚀若有问题 评论区见 ❤ 欢迎大家点赞 评论 收藏 分享 如果你不知道分享给谁,那就分享给薯条. 你们的支持是我不断创作的动力 . 王子,公主请阅&…...
【verilog】多个 if 控制同一个变量(后面会覆盖前面)非阻塞赋值真的并行吗?
非阻塞赋值 (<) 是“并行”的,但是代码顺序会影响结果?”这正是 Verilog 的硬件描述本质 vs 行为语义之间的微妙之处。 💡1. 非阻塞赋值真的并行吗? 是的!非阻塞赋值 < 从行为上是并行的,也就是说&a…...
C++事件驱动编程从入门到实战:深入理解与高效应用
C事件驱动编程从入门到实战:深入理解与高效应用 在现代软件开发中,事件驱动编程(Event-Driven Programming)作为一种流行的编程范式,被广泛应用于图形用户界面(GUI)、网络通信、游戏开发等众多…...
问题 | MATLAB比Python更有优势的特定领域
以下是关于MATLAB在特定领域相较于Python的优势的详细分析,结合其核心功能、行业应用及技术特性展开论述: 一、科学研究与工程计算 1. 数值计算的高效性 MATLAB的核心设计围绕矩阵运算展开,其底层对线性代数和数值计算进行了深度优化。例如…...
黑马商城项目(三)微服务
一、单体架构 测试高并发软件 二、微服务 三、SpringCloud 四、微服务拆分 黑马商城模块: 服务拆分原则: 拆分服务: 独立project: maven聚合: 拆分案例: 远程调用: package com.hmall.cart.…...
Qt界面卡住变慢的解决方法
本质原因: 当Qt界面出现卡顿或无响应时,通常是因为主线程(GUI线程)被耗时操作阻塞。 完全忘了。。。 Qt Creater解决方法 1. 定位耗时操作 目标:找到阻塞主线程的代码段。 方法: 使用QElapsedTimer测量代码执行时间…...
Flutter的原理及美团的实践(下)
Flutter的原理及性能实践 Flutter和原生性能对比 虽然使用原生实现(左)和Flutter实现(右)的全品类页面在实际使用过程中几乎分辨不出来: 但是我们还需要在性能方面有一个比较明确的数据对比。 我们最关心的两个页面…...
时序预测 | Matlab实现基于VMD-WOA-ELM和VMD-ELM变分模态分解结合鲸鱼算法优化极限学习机时间序列预测
时序预测 | Matlab实现基于VMD-WOA-ELM和VMD-ELM变分模态分解结合鲸鱼算法优化极限学习机时间序列预测 目录 时序预测 | Matlab实现基于VMD-WOA-ELM和VMD-ELM变分模态分解结合鲸鱼算法优化极限学习机时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab…...
【云安全】云原生- K8S IngressNightmare CVE-2025-1974(漏洞复现完整教程)
漏洞原理 CVE-2025-1974: The IngressNightmare in Kubernetes | Wiz Blog 分两方面: a、配置注入过程 构造一个恶意的Ingress资源,其中注入ssl_engine指令指向恶意共享库向准入控制器验证端点(AdmissionWebhook)发送Admissio…...
Tomcat与Servlet(2)
上篇文章: Tomcat与Servlethttps://blog.csdn.net/sniper_fandc/article/details/147278469?fromshareblogdetail&sharetypeblogdetail&sharerId147278469&sharereferPC&sharesourcesniper_fandc&sharefromfrom_link 上篇文章介绍了To…...
在高数据速度下确保信号完整性的 10 个关键策略
随着越来越多的传感器连接到系统,需要快速、可靠和安全地传输更多数据,对带宽和设计复杂性的需求也在增加。优先考虑的是确保从 A 发送到 B 的信号不会失真。 确保信号完整性 对于设计依赖于持续准确数据流的数据密集型应用程序的工程师来说,…...
2025华中杯数学建模B题完整分析论文(共42页)(含模型、数据、可运行代码)
2025华中杯大学生数学建模B题完整分析论文 目录 一、问题重述 二、问题分析 三、模型假设 四、 模型建立与求解 4.1问题1 4.1.1问题1解析 4.1.2问题1模型建立 4.1.3问题1样例代码(仅供参考) 4.1.4问题1求解结果(仅供参考&am…...
UE5 自带的视频播放器
文章目录 文件夹准备添加一个文件媒体源方法1方法2 添加一个视频播放器播放视频直接播放使用网格体播放使用UI播放 播放视频的音乐媒体播放器常用的节点设置循环是用绝对路径播放视频,视频无需导入注册播放完成事件 文件夹准备 视频必须被放在Content/Moveis文件下…...
是德科技E5080B网络分析仪深度评测:5G/车载雷达测试实战指南
是德科技E5080B网络分析仪(ENA系列)是一款高性能射频测试仪器,广泛应用于通信、航空航天、半导体等领域,以下是其核心功能详解: 一、核心测试功能 多参数网络分析 S参数测量:支持全双端口S参数测试…...
javaSE————网络编程套接字
网络编程套接字~~~~~ 好久没更新啦,蓝桥杯爆掉了,从今天开始爆更嗷; 1,网络编程基础 为啥要有网络编程呢,我们进行网络通信就是为了获取丰富的网络资源,说实话真的很神奇,想想我们躺在床上&a…...
力扣349 == 两个数组交集的两种解法
目录 解法一:利用 Set 特性高效去重 解法二:双重遍历与 Set 去重 方法对比与总结 关键点总结 题目描述 给定两个整数数组 nums1 和 nums2,要求返回它们的交集。输出结果中的每个元素必须是唯一的,且顺序不限。 示例 输入&…...
笔试专题(十)
文章目录 对称之美(双指针)题解代码 连续子数组最大和(线性dp)题解代码 最长回文子序列(区间dp)题解代码 对称之美(双指针) 题目链接 题解 1. 双指针 2. 用left标记左边的字符串…...
YOLOv12即插即用---RFAConv
1.模块介绍 接受域注意卷积(RFAConv):更聪明地感知空间特征 在传统卷积神经网络中,卷积核参数的共享机制虽有效提升了模型的泛化能力与计算效率,但却忽略了不同空间位置特征在感知范围(即接受域)内的重要性差异。为此,我们提出了一种更具感知能力的模块 —— 接受域注…...
使用datax通过HbaseShell封装writer和reader同步hbase数据到hbase_踩坑_细节总结---大数据之DataX工作笔记008
最近在做大数据相关功能,有个需求,使用datax同步hbase到hbase中,其中还是有很多细节值得记录: 首先来看一下datax的源码中,如果你使用phoenix创建的表,那么 你就需要使用对应的hbase带有sql字样的,reader和writer. 然后如果你使用datax-web来进行测试的,那么,他默认使用的是h…...
Python解决“小D的abc字符变换”问题
小D的“abc”变换问题 问题描述测试样例解题思路代码 问题描述 小D拿到了一个仅由 “abc” 三种字母组成的字符串。她每次操作会对所有字符同时进行以下变换: 将 ‘a’ 变成 ‘bc’ 将 ‘b’ 变成 ‘ca’ 将 ‘c’ 变成 ‘ab’ 小D将重复该操作 k 次。你的任务是输…...
C++学习:六个月从基础到就业——面向对象编程:重载运算符(下)
C学习:六个月从基础到就业——面向对象编程:重载运算符(下) 本文是我C学习之旅系列的第十三篇技术文章,是面向对象编程中运算符重载主题的下篇。本篇文章将继续深入探讨高级运算符重载技术、特殊运算符、常见应用场景和…...
电压模式控制学习
电压模式控制 在开关电源中,大的可分为三大控制模式,分别是电压模式控制,电流模式控制,迟滞模式控制。今天简要介绍下电压模式控制的优缺点。 原理 架构图如下 如图所示,电压模式控制可以分为三部分:误…...
vue3 Ts axios 封装
vue3 Ts axios 封装 axios的封装 import axios, { AxiosError, AxiosInstance, InternalAxiosRequestConfig, AxiosResponse, AxiosRequestConfig, AxiosHeaders } from axios import qs from qs import { config } from ./config import { ElMessage } from element-plus// …...
GPT,Bert类模型对比
以下是对 BERT-base、RoBERTa-base、DeBERTa-base 和 DistilBERT-base 四个模型在参数量、训练数据、GPU 内存占用、性能表现以及优缺点方面的对比: 模型参数量与训练数据 模型参数量训练数据量BERT-base110MBookCorpus(8亿词) 英文维基百科…...
3.Rust + Axum 提取器模式深度剖析
摘要 深入解读 Rust Axum 提取器模式,涵盖内置提取器及自定义实现。 一、引言 在 Rust 的 Web 开发领域,Axum 作为一款轻量级且高效的 Web 框架,为开发者提供了强大的功能。其中,提取器(Extractor)模式…...
Dify vs n8n vs RAGFlow:2025年AI应用与自动化工作流平台的终极对决
我将为大家整理一份关于 Dify、n8n 和 Ragflow 的最新研究分析,涵盖以下六个方面:功能对比、应用场景、架构设计、集成能力、和使用门槛。我会尽可能引用其官方文档、GitHub 仓库以及社区讨论等权威信息来源。 我整理好后会第一时间通知你查看。 1.Dify、n8n 和 RAGFlow 最新…...
ffmpeg无损转格式的命令行
将ffmpeg.exe拖入命令行窗口 c:\users\zhangsan>D:\ffmpeg-2025-03-11\bin\ffmpeg.exe -i happy.mp4 -c:v copy -c:a copy 格式转换后.mkv -c:v copy 仅做拷贝视频,不重新编码 -c:a copy 仅做拷贝音频 ,不重新编码...
Flutter 常用命令
1、创建项目 flutter create <项目名称> 示例: flutter create my_app 1.1 参数说明 --org:设置包名(默认 com.example) flutter create --org com.yourcompany my_app -a/-i:指定语言(Kotlin…...
【零基础】基于DeepSeek-R1与Qwen2.5Max的行业洞察自动化平台
自动生成行业报告,通过调用两个不同的大模型(DeepSeek 和 Qwen),完成从行业趋势分析到结构化报告生成的全过程。 完整代码:https://mp.weixin.qq.com/s/6pHi_aIDBcJKw1U61n1uUg 🧠 1. 整体目的与功能 该脚本实现了一个名为 ReportGenerator 的类,用于: 调用 DeepSe…...
UE5 关卡序列
文章目录 介绍创建一个关卡序列编辑动画添加一个物体编辑动画时间轴显示秒而不是帧时间轴跳转到一个确定的时间时间轴的显示范围更改关键帧的动画插值方式操作多个关键帧 播放动画 介绍 类似于Unity的Animation动画,可以用来录制场景中物体的动画 创建一个关卡序列…...
1.凸包、极点、极边基础概念
目录 1.凸包 2.调色问题 3.极性(Extrem) 4.凸组合(Convex Combination) 5.问题转化(Strategy)编辑 6.In-Triangle test 7.To-Left-test 8.极边(Extream Edges) 1.凸包 凸包就是上面蓝色皮筋围出来的范围 这些钉子可以转换到坐标轴中࿰…...
MahApps.Metro:专为 WPF 应用程序设计的 UI 框架
推荐一个WPF 应用程序设计的 UI 框架,方便我们快速构建美观、流畅的应用程序。 01 项目简介 MahApps.Metro 是一个开源的 UI 框架,它可以让开发者快速构建现代化、美观的 WPF 应用程序。 提供了一套完整的 UI 组件和主题,支持流畅的动画效…...
【LangChain4j快速入门】5分钟用Java玩转GPT-4o-mini,Spring Boot整合实战!| 附源码
【LangChain4j快速入门】5分钟用Java玩转GPT-4o-mini,Spring Boot整合实战! 前言:当Java遇上大模型 在AI浪潮席卷全球的今天,Java开发者如何快速拥抱大语言模型?LangChain4j作为专为Java打造的AI开发框架,…...
乐言科技:云原生加速电商行业赋能,云消息队列助力降本 37%
深耕 AI SaaS,助力数万电商客户数智化转型 上海乐言科技股份有限公司(以下简称“乐言科技”,官网:https://www.leyantech.com/)自 2016 年成立以来,专注于利用自然语言处理和深度学习等核心 AI 技术&#…...
vscode构建简单编译和调试环境
一、设置环境变量 将bin目录路径(如D:\DevTools\mingw64\bin)加入系统环境变量PATH34 二、VS Code插件配置 核心插件安装 C/C(微软官方扩展,提供语法高亮、智能提示)Code Runner࿰…...
STM32控制DRV8825驱动42BYGH34步进电机
最近想玩一下人工智能,然后买了个步进电机想玩一下,刚到了一脸懵逼,发现驱动器20多块,有点超预算,然后整了个驱动板,方便自己画线路板,经过各种搜索,终于转起来了,记录一…...