spring响应式编程系列:总体流程
目录
示例
程序流程
just
subscribe
new LambdaMonoSubscriber
MonoJust.subscribe
new Operators.ScalarSubscription
onSubscribe
request
onNext
时序图
类图
数据发布者
MonoJust
数据订阅者
LambdaSubscriber
订阅的消息体
ScalarSubscription
想要了解响应式编程的总体流程,只要做到真正吃透一个简单的示例即可。
如下所示:
示例
首先,通过调用Mono.just创建一个单元素的数据发布者(Publisher);
然后,通过调用mono.subscribe订阅数据发布者(Publisher)发布的数据。
如下所示:
// 创建一个包含数据的 Mono |
程序流程
点击Mono.just,如下所示:
just
public static <T> Mono<T> just(T data) { return onAssembly(new MonoJust(data)); } |
在这里,直接new一个MonoJust对象并返回。
点击示例里的mono.subscribe,如下所示:
subscribe
public abstract class Mono<T> implements CorePublisher<T> { ... ... public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) { return (Disposable)this.subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer, completeConsumer, (Consumer)null, initialContext)); } |
在这里,将示例里subscribe的参数作为LambdaMonoSubscriber的构造参数,然后new一个LambdaMonoSubscriber对象。
LambdaMonoSubscriber对象的初始化参数,如下所示:
new LambdaMonoSubscriber
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { final Consumer<? super T> consumer; final Consumer<? super Throwable> errorConsumer; final Runnable completeConsumer; final Consumer<? super Subscription> subscriptionConsumer; final Context initialContext; volatile Subscription subscription; ... ... LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) { this.consumer = consumer; this.errorConsumer = errorConsumer; this.completeConsumer = completeConsumer; this.subscriptionConsumer = subscriptionConsumer; this.initialContext = initialContext == null ? Context.empty() : initialContext; } |
MonoJust.subscribe
final class MonoJust<T> extends Mono<T> implements ScalarCallable<T>, Fuseable, SourceProducer<T> { ... ... public void subscribe(CoreSubscriber<? super T> actual) { actual.onSubscribe(Operators.scalarSubscription(actual, this.value)); } |
在这里,来到了MonoJust对象的subscribe方法,该方法调用了LambdaMonoSubscriber对象的onSubscribe方法;
同时,new一个Operators.ScalarSubscription对象,该对象封装了LambdaMonoSubscriber对象和数据发布者MonoJust发布的数据。
如下所示:
new Operators.ScalarSubscription
public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) { return new Operators.ScalarSubscription(subscriber, value, stepName); } |
点击actual.onSubscribe,如下所示:
onSubscribe
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { ... ... public final void onSubscribe(Subscription s) { if (Operators.validate(this.subscription, s)) { this.subscription = s; if (this.subscriptionConsumer != null) { try { this.subscriptionConsumer.accept(s); } catch (Throwable var3) { Exceptions.throwIfFatal(var3); s.cancel(); this.onError(var3); } } else { s.request(9223372036854775807L); } } } |
在这里,LambdaMonoSubscriber对象调用了Operators.ScalarSubscription对象的request方法。
如下所示:
request
static final class ScalarSubscription<T> implements SynchronousSubscription<T>, InnerProducer<T> { public void request(long n) { if (Operators.validate(n) && ONCE.compareAndSet(this, 0, 1)) { Subscriber<? super T> a = this.actual; a.onNext(this.value); if (this.once != 2) { a.onComplete(); } } } |
在这里,Operators.ScalarSubscription对象又调用了LambdaMonoSubscriber对象的onNext方法。
LambdaMonoSubscriber对象的onNext方法如下所示:
onNext
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { public final void onNext(T x) { Subscription s = (Subscription)S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { Operators.onNextDropped(x, this.initialContext); } else { if (this.consumer != null) { try { this.consumer.accept(x); } catch (Throwable var5) { Exceptions.throwIfFatal(var5); s.cancel(); this.doError(var5); } } if (this.completeConsumer != null) { try { this.completeConsumer.run(); } catch (Throwable var4) { Operators.onErrorDropped(var4, this.initialContext); } } } } |
终于,在这里,调用了示例里subscribe()方法的回调函数了。
时序图
【说明】
- Mono和MonoJust是数据发布者,LambdaMonoSubscriber是数据消费者,ScalarSubscription是订阅的消息;
- 类的设计还是比较清晰的,就是方法的调用显示有点绕。
- 数据发布者,提供了just方法来生成数据发布者(Publisher);
- 数据订阅者,提供了onSubscribe和onNext方法来响应订阅事件和读取数据;
- 订阅的消息体,封装了数据订阅者和数据发布发布的数据,并且提供了request方法用来处理数据。
- 使用了观察者设计模式:LambdaMonoSubscriber是观察者模式中的观察者(Observer),它订阅(subscribe)一个发布者(MonoJust),MonoJust是观察者模式中的主题(Subject),它负责通知所有的 Subscriber。
类图
数据发布者
MonoJust
【说明】
- Publisher
定义了接口:void subscribe(Subscriber<? super T> var1)。
- CorePublisher
定义了接口:void subscribe(CoreSubscriber<? super T> subscriber)。
- Mono
是一个抽象类,实现了数据发布者通用的各种功能。
比如:使用了工厂方法设计模式来创建诸如MonoJust、MonoCreate、MonoDefer、MonoError等各种具体的数据发布者。
- MonoJust
是一个特定的数据发布者(Publisher),实现了接口void subscribe(CoreSubscriber<? super T> actual)。
数据订阅者
LambdaSubscriber
【说明】
- Subscriber
定义了如下接口:onSubscribe、onNext、onError、onComplete。
- CoreSubscriber
定义了如下接口:onSubscribe
- LambdaMonoSubscriber
关联了consumer、errorConsumer、completeConsumer、subscriptionConsumer这些对象,以完成订阅相关的各种操作。
订阅的消息体
ScalarSubscription
【说明】
- Subscription
提供了如下接口:void request(long var1)、void cancel();
- ScalarSubscription
封装了数据订阅者和数据发布者发布的数据。
相关文章:
spring响应式编程系列:总体流程
目录 示例 程序流程 just subscribe new LambdaMonoSubscriber MonoJust.subscribe new Operators.ScalarSubscription onSubscribe request onNext 时序图 类图 数据发布者 MonoJust …...
基于PySide6与pyCATIA的圆柱体特征生成工具开发实战——NX建模之圆柱命令的参考与移植
引言 在机械设计领域,特征建模的自动化是提升设计效率的关键。本文基于PySide6与pycatia技术栈,深度解析圆柱特征自动化生成系统的开发实践,涵盖参数化建模、交互式元素选择、异常处理等核心模块,实现比传统手动操作提升3倍效率的…...
kafka jdbc connector适配kadb数据实时同步
测试结论 源端增量获取方式包括:bulk、incrementing、timestamp、incrementingtimestamp(混合),各种方式说明如下: bulk: 一次同步整个表的数据 incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新…...
pgsql中使用jsonb的mybatis-plus和jps的配置
在pgsql中使用jsonb类型的数据时,实体对象要对其进行一些相关的配置,而mybatis和jpa中使用各不相同。 在项目中经常会结合 MyBatis-Plus 和 JPA 进行开发,MyBatis_plus对于操作数据更灵活,jpa可以自动建表,两者各取其…...
4.17-4.18学习总结 多线程
并发与并行: 并发和并行是有可能都在发生的。 多线程的实现方式: 第一种:创建子类对象,调用start方法启动线程。 第二种: 第三种: 第一种和第二种不可以获取到多线程结果,但第三章种可以。 多…...
无人机在农业中的应用与挑战!
一、无人机在农业中的作用 1. 提升作业效率与降低成本 无人机在喷洒农药、播种、施肥、吊运等环节显著提升效率。例如,湖北秭归县使用大疆T100无人机吊运脐橙,单次85公斤的运输任务仅需2分钟,而人工需1小时,综合成本降低250元…...
无刷电机槽数相同、转子极数不同的核心区别
一、基础原理差异 无刷电机的核心参数: 槽数(定子槽数,记为 ( Z )):定子铁芯上的绕组槽数量,决定绕组布局。极数(转子磁极数,记为 ( 2p )):转子上的永磁体磁极对数(总极数为 ( 2p ),如 ( p=4 ) 表示 8 极)。核心关系:槽极配合(( Z/2p ))决定电机电磁结构,相同…...
为您的照片提供本地 AI 视觉:使用 Llama Vision 和 ChromaDB 构建 AI 图像标记器
有没有花 20 分钟浏览您的文件夹以找到心中的特定图像或屏幕截图?您并不孤单。 作为工作中的产品经理,我总是淹没在竞争对手产品的屏幕截图、UI 灵感以及白板会议或草图的照片的海洋中。在我的个人生活中,我总是捕捉我在生活中遇到的事物&am…...
OpenAI 34页最佳构建Agent实践
penAI发布O4,也发布34页最佳构建Agent实践,值得阅读。 什么是Agent? 传统软件使用户能够简化和自动化工作流程,而代理能够以高度独立的方式代表用户执行相同的工作流程。 代理是能够独立地代表您完成任务的系统。 工作流程是必…...
第 5 期(进阶版):训练第一个 DDPM 模型(使用 CIFAR-10 数据集)
—— 用 DDPM 模型生成彩色图像,感受扩散魔法在 CIFAR-10 上的威力! 本期目标 将 MNIST 替换为 CIFAR-10; 模型结构适配 RGB 三通道输入; 保持原始扩散与采样流程; 增加图像可视化对比! 数据准备&…...
进程间通信(IPC)----共享内存
进程间通信(IPC)的共享内存机制允许不同进程直接访问同一块物理内存区域,是速度最快的IPC方式(无需数据拷贝)。 一、共享内存核心概念 1. 基本原理 共享内存区域:由内核管理的特殊内存段,可被…...
Xcode16 调整 Provisioning Profiles 目录导致证书查不到
cronet demo 使用的 ninja 打包,查找 Provisioning Profiles 路径是 ~/Library/MobileDevice/Provisioning Profiles,但 Xcode16 把该路径改为了 ~/Library/Developer/Xcode/UserData/Provisioning Profiles,导致在编译 cronet 的demo 时找不…...
Debian服务器环境下env变量丢失怎么办
在 Debian服务器环境下,如果出现了 env 环境变量丢失的问题,比如常见的 PATH、JAVA_HOME、PYTHONPATH 等系统变量或自定义变量不起作用,可能会导致一些命令无法执行、服务无法启动、脚本报错等。 这个问题常见于: 使用 cron、sy…...
搜广推校招面经七十八
字节推荐算法 一、实习项目:多任务模型中的每个任务都是做什么?怎么确定每个loss的权重 这个根据实际情况来吧。如果实习时候用了moe,就可能被问到。 loss权重的话,直接根据任务的重要性吧。。。 二、特征重要性怎么判断的&…...
ctf.show—Web(1-10)详细通关教程
Web1-签到题 1、按F12查看元素,发现有一段被注释的字符串 2、看起来并不像flag,格式类似于Base64编码 扔到Base64在线编码平台:Base64 编码/解码 - 锤子在线工具此工具是一个 Base64 编码或解码在线工具,实现把字符串转成 Base6…...
双层Key缓存
双层 Key 缓存是一种针对 缓存击穿 和 雪崩问题 的优化方案,其核心思想是通过 主备双缓存 的机制,确保在热点数据过期时仍能提供可用服务,同时降低对数据库的瞬时压力。以下是其核心原理、实现细节及适用场景的深度解析: 一、核心…...
android编译使用共享缓存
注意 服务器端与客户端系统的版本号需为Ubuntu20.04ccache版本不能低于4.4执行用户需要为sudo权限服务器端nfs目录权限必须为nobody:nogroup 一、服务端配置: 在服务器192.168.60.142上配置 NFS 共享 1.安装 NFS 服务器: 1 sudo apt-get install nfs…...
如何使用Labelimg查看已经标注好的YOLO数据集标注情况
文章目录 1、 安装并运行Labelimg1.1、安装Labelimg1.2、运行Labelimg 2、查看数据集标注情况2.1、创建类别名称文件classes.txt2.2、使用Labelimg打开查看标注文件2.3、正式标注 3、目标检测系列文章 本文主要介绍一下如何使用LabelImg查看已经标注好的YOLO数据集标注情况&…...
Web3架构下的数据隐私与保护
在这个信息爆炸的时代,Web3的概念如同一股清流,以其去中心化的特性,为数据隐私与保护带来了新的希望。Web3,也被称作下一代互联网,它通过区块链技术实现数据的去中心化存储和处理,旨在提高数据的安全性和隐…...
PCM 参数深度解析:采样率、帧、缓存大小与多通道关系
将下面的 PCM 配置作为例子: config.channels 2; // 立体声(2 个通道) config.rate 48000; // 采样率 48000 Hz,即每秒 48000 帧 config.period_size 1024; // 每个周…...
Kafka消费者端重平衡流程
重平衡的完整流程需要消费者 端和协调者组件共同参与才能完成。我们先从消费者的视角来审视一下重平衡的流程。在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类…...
【字节跳动AI论文】海姆达尔:生成验证的测试时间扩展
摘要:人工智能系统只能在能够验证知识本身的范围内创建和维护知识。 最近关于长链推理的研究表明,LLM在解决竞争问题方面具有巨大的潜力,但它们的验证能力仍然很弱,而且没有得到充分的研究。 在本文中,我们提出了Heimd…...
【Datawhale Al春训营】气象预测(AI+航空安全)竞赛笔记
这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…...
大模型应用开发实战:AI Agent与智能体开发技术解析
更多AI大模型应用开发学习内容,尽在聚客AI学院 一、AI Agent的核心概念 AI Agent(智能体)是基于大模型构建的自主任务执行系统,能够根据用户指令拆解目标、调用工具、完成复杂任务(如数据分析、自动化办公)…...
《Learning Langchain》阅读笔记3-基于 Gemini 的 Langchain如何从LLMs中获取特定格式
纯文本输出是有用的,但在某些情况下,我们需要 LLM 生成结构化输出,即以机器可读格式(如 JSON、XML 或 CSV)或甚至以编程语言(如 Python 或 JavaScript)生成的输出。当我们打算将该输出传递给其他…...
Mac mini 安装mysql数据库以及出现的一些问题的解决方案
首先先去官网安装一下mysql数据库,基本上都是傻瓜式安装的流程,我也就不详细说了。 接下来就是最新版的mysql安装的时候,他就会直接让你设置一个新的密码。 打开设置,拉到最下面就会看到一个mysql的图标: 我设置的就是…...
智能体时代的产业范式确立,中国企业以探索者姿态走出自己的路
作者 | 曾响铃 文 | 响铃说 当前,一个新的20年的产业升级期已经开启,系统性的发展路径也正在形成。 前不久,以“共建智能体时代“为主题的超聚变探索者大会2025在河南郑州举办。超聚变变数字技术有限公司(以下简称:…...
电路安全智控系统与主机安全防护系统主要功能是什么
电路安全智控系统被称为电路安全用电控制系统。电路安全智控系统具备一系列强大且实用的功能。电路安全智控系统能够对总电压、总电流、总功率、总电能,以及各分路的电压、电流、功率、电能和功率因素等进行全方位的监控。在大型工厂的电力分配中,通过对…...
MCP Server驱动传统SaaS智能化转型:从工具堆叠到AI Agent生态重构,基于2025年技术演进与产业实践
MCP Server驱动传统SaaS智能化转型:从工具堆叠到AI Agent生态重构 (基于2025年技术演进与产业实践) MCP模型上下文协议 一、技术底座革新:MCP协议重构AI时代的"数字接口" 传统SaaS软件向大模型AI应用转型的核心矛盾…...
【工具变量】地市农业播种面积及粮食产量等21个相关指标(2013-2022年)
粮食产量、粮食播种面积及农作物播种面积等,是衡量农业发展水平和粮食安全的重要指标。随着全球粮食需求的持续增长,准确掌握这些数据对制定农业政策、优化生产结构和提高农业生产效率至关重要。因此,缤本次分享数据包括《中国统计NJ》、《中…...
使用 PySpark 批量清理 Hive 表历史分区
使用 PySpark 批量清理 Hive 表历史分区 在大数据平台中,Hive 表通常采用分区方式存储数据,以提升查询效率和数据管理的灵活性。随着数据的不断积累,历史分区会越来越多,既占用存储空间,也影响元数据管理性能。因此&a…...
A. k-th equality(1700)
Problem - 1835A - Codeforces Daily_CF_Problems/daily_problems/2025/04/0417/solution/cf1835a.md at main Yawn-Sean/Daily_CF_Problems 考虑所有形式为 abc 的等式,其中 a有 A 位数, b 有 B 位数, c 有 C 位数。所有数字都是正整数,求…...
深度学习-torch,全连接神经网路
3. 数据集加载案例 通过一些数据集的加载案例,真正了解数据类及数据加载器。 3.1 加载csv数据集 代码参考如下 import torch from torch.utils.data import Dataset, DataLoader import pandas as pd class MyCsvDataset(Dataset):def __init__(self, fil…...
echarts饼图中心呈现一张图片,并且能动态旋转的效果react组件
实现效果: 父组件: import React from react import styles from ./style.less import GaugeChart from ./GaugeChart;export default function index() {return (<div><div className{styles.bg} ></div><div style{{ width: 500…...
使用Docker搭建开源Email服务器
使用Docker搭建开源Email服务器 1 介绍 开源的Email服务器比较多,例如:poste.io、MailCatcher、Postal、mailcow等。由于poste.io支持docker安装,页面比较美观,使用简单,支持SMTP IMAP POP3等协议,安全…...
css图片设为灰色
使用filter方式将图片设置为灰色 普通图片使用:filter: saturate(0); 纯白图片使用: <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"width…...
2025 年第十五届 MathorCup竞赛赛题浅析-助攻快速选题
本届妈杯竞赛各赛题难度均已经达到了国赛难度,也更好的回应了大家更为关心的,在当前AI环境下,似乎“数学建模变成了AI使用竞赛一样”。但是国委会一直以来都是一个态度:AI现在是无法直接解决任何一个国赛赛题的。对应的如今这句话…...
【android bluetooth 案例分析 03】【PTS 测试 1】【pts基本介绍】
Bluetooth SIG(Special Interest Group)提供的 PTS(Profile Tuning Suite)测试 是蓝牙认证过程中一项极为关键的步骤。它主要用于验证设备是否符合 Bluetooth SIG 制定的各项 蓝牙规范(Bluetooth Specification&#x…...
Java集合框架深度解析:HashMap、HashSet、TreeMap、TreeSet与哈希表原理详解
一、核心数据结构总览 1. 核心类继承体系 graph TDMap接口 --> HashMapMap接口 --> TreeMapSet接口 --> HashSetSet接口 --> TreeSetHashMap --> LinkedHashMapHashSet --> LinkedHashSetTreeMap --> NavigableMapTreeSet --> NavigableSet 2. 核心…...
【深度学习】张量计算:爱因斯坦求和约定|tensor系列03
博主简介:努力学习的22级计算机科学与技术本科生一枚🌸博主主页: Yaoyao2024往期回顾:【深度学习】详解矩阵乘法、点积,内积,外积、哈达玛积极其应用|tensor系列02每日一言🌼: “岱宗夫如何&…...
OpenHarmony-Risc-V上运行openBLAS中的benchmark
OpenHarmony-Risc-V上运行openBLAS中的benchmark 文章目录 OpenHarmony-Risc-V上运行openBLAS中的benchmark前言一、编译openBLAS1.源码下载2.工具链下载3.编译并安装openBLAS 二、编译open BLAS中的benchmark三、上设备运行总结 前言 参考https://zhuanlan.zhihu.com/p/18825…...
CCF CSP 第36次(2024.12)(2_梦境巡查_C++)
CCF CSP 第36次(2024.12)(2_梦境巡查_C) 解题思路:思路一: 代码实现代码实现(思路一): 时间限制: 1.0 秒 空间限制: 512 MiB 原题链接 解题思路…...
windows下安装mcp servers
以sequential-thinking为例 macos下安装就像github readme中那样安装即可: {"mcpServers": {"sequential-thinking": {"command": "npx","args": ["-y","modelcontextprotocol/server-sequenti…...
OpenGauss 数据库介绍
OpenGauss 数据库介绍 OpenGauss 是华为基于 PostgreSQL 开发的企业级开源关系型数据库,现已成为开放原子开源基金会的项目。以下是 OpenGauss 的详细介绍: 一 核心特性 1.1 架构设计亮点 特性说明优势多核并行NUMA感知架构充分利用现代CPU多核性能行…...
Web3区块链网络中数据隐私安全性探讨
在这个信息爆炸的时代,Web3 的概念如同一股清流,以其去中心化、透明性和安全性的特点,为数据隐私保护提供了新的解决方案。本文将探讨 Web3 区块链网络中数据隐私的安全性问题,并探索如何通过技术手段提高数据隐私的保护。 Web3 …...
linux驱动之poll
驱动中 poll 实现 在用户空间实现事件操作的一个主要实现是调用 select/poll/epoll 函数。那么在驱动中怎么来实现 poll 的底层呢? 其实在内核的 struct file_operations 结构体中有一个 poll 成员,其就是底层实现的接口函数。 驱动中 poll 函数实现原…...
【最后203篇系列】028 FastAPI的后台任务处理
说明 今天偶然在别的文章里看到这个功能,突然觉得正好。 CeleryWorker已经搭好了,但是我一直想在用户请求时进行额外的处理会比较影响处理时间,用这个正好可以搭配上。 我设想的一个场景: 1 用户发起请求2 接口中进行关键信息…...
微信小程序中,将搜索组件获取的值传递给父页面(如 index 页面)可以通过 自定义事件 或 页面引用 实现
将搜索组件获取的值传递给父页面(如 index 页面)可以通过 自定义事件 或 页面引用 实现 方法 1:自定义事件(推荐) 步骤 1:搜索组件内触发事件 在搜索组件的 JS 中,当获取到搜索值时,…...
深入理解分布式缓存 以及Redis 实现缓存更新通知方案
一、分布式缓存简介 1. 什么是分布式缓存 分布式缓存:指将应用系统和缓存组件进行分离的缓存机制,这样多个应用系统就可以共享一套缓存数据了,它的特点是共享缓存服务和可集群部署,为缓存系统提供了高可用的运行环境,…...
C#核心笔记——(六)框架基础
我们在编程时所需的许多核心功能并不是由C#语言提供的,而是由.NET Framework中的类型提供的。本节我们将介绍Framework在基础编程任务(例如虚的等值比较、顺序比较以及类型转换)中的作用。我们还会介绍Framework中的基本类型,例如String、DateTime和Enum. 本章中的绝大部分…...