nats jetstream server code 分析
对象和缩写
jetstream导入两个对象:stream and consumer,在stream 之上构造jetstreamapi。在nats代码中,以下是一些常见的缩写
1.mset is stream
2.jsX is something of jetstream
3.o is consumer
代码分析
对于producer ,发送的消息将通过以下堆栈(包括kv 模式)进行处理
核心函数是:
func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64) error {
消息保存是通过以下代码:
// Store actual msg.if lseq == 0 && ts == 0 {seq, ts, err = store.StoreMsg(subject, hdr, msg)} else {// Make sure to take into account any message assignments that we had to skip (clfs).seq = lseq + 1 - clfs// Check for preAcks and the need to skip vs store.if mset.hasAllPreAcks(seq, subject) {mset.clearAllPreAcks(seq)store.SkipMsg()} else {err = store.StoreRawMsg(subject, hdr, msg, seq, ts)}}
每个consumer 都有一个相对的影子主题,如果你有兴趣找到所有主题,你可以在func(sSublist)Insert(subsubscription)中添加一些跟踪代码。
集群模式下, leader node是设置在consumer level。
伴随consumer 和 stream的shadow 主题也很重要:在nats中,一切对象都保存为subject,就像
- $JS.API
- $JSC.CI - consumer
- $SYS.REQ.USER.INFO
- $SRV.>
- _INBOX
对于customer和内部的订阅,subscribe,是通过以下addServiceImportSub 的代码实现
// Internal account clients are for service imports and need the '\r\n'.start := time.Now()if client.kind == ACCOUNT {sub.icb(sub, c, acc, string(subject), string(reply), msg)} else {sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])}if dur := time.Since(start); dur >= readLoopReportThreshold {srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)}
而func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) bool {
会call addServiceImportSub
注意nats有不少类似下面的func set模式来实现调用
// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
cb := func(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) {c.processServiceImport(si, acc, msg)}
服务器重新启动时,将重新分配已经存在 consumer给不同server:
func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) {
在setLeader()函数中启动相关例程loopAndGatherMsgs、processInboundAcks、processInboundNextMsgReqs,这些例程通过go channel进行协作。
go func() {setGoRoutineLabels(labels)o.loopAndGatherMsgs(qch)}()// Now start up Go routine to process acks.go func() {setGoRoutineLabels(labels)o.processInboundAcks(qch)}()if pullMode {// Now start up Go routine to process inbound next message requests.go func() {setGoRoutineLabels(labels)o.processInboundNextMsgReqs(qch)}()}
在loopAndGatherMsgs函数中,以下代码是用于推送和拉取消费者的核心代码
// If we are in push mode and not active or under flowcontrol let's stop sending.if o.isPushMode() {if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {goto waitForMsgs}} else if o.waiting.isEmpty() {// If we are in pull mode and no one is waiting already break and wait.goto waitForMsgs}// Grab our next msg.pmsg, dc, err = o.getNextMsg()// We can release the lock now under getNextMsg so need to check this condition again here.if o.closed || o.mset == nil {o.mu.Unlock()return}
pull操作符的核心代码:
// Grab our next msg.pmsg, dc, err = o.getNextMsg()// We can release the lock now under getNextMsg so need to check this condition again here.if o.closed || o.mset == nil {o.mu.Unlock()return}...
// Do actual delivery.o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)
当客户端是推送消费者时,将通过updateDeliveryInterest设置兴趣和队列
func (o *consumer) updateDeliveryInterest(localInterest bool) bool {interest := o.hasDeliveryInterest(localInterest)o.mu.Lock()defer o.mu.Unlock()mset := o.msetif mset == nil || o.isPullMode() {return false}if interest && !o.active {o.signalNewMessages()}// Update active status, if not active clear any queue group we captured.if o.active = interest; !o.active {o.qgroup = _EMPTY_} else {o.checkQueueInterest()}
推送模式也通过func(o*consumer)loopAndGatherMsgs(qch-chan-struct{})向客户端推送消息,当流接收到新消息时,通过通道触发跟踪功能
// This will update and signal all consumers that match.
func (mset *stream) signalConsumers(subj string, seq uint64) {mset.clsMu.RLock()if mset.csl == nil {mset.clsMu.RUnlock()return}r := mset.csl.Match(subj)mset.clsMu.RUnlock()if len(r.psubs) == 0 {return}// Encode the sequence here.var eseq [8]bytevar le = binary.LittleEndianle.PutUint64(eseq[:], seq)msg := eseq[:]for _, sub := range r.psubs {sub.icb(sub, nil, nil, subj, _EMPTY_, msg)}
}
sub.icb 是一个 processStreamSignal function ,通过以下代码设置 :
// Creates a sublist for consumer.
// All subjects share the same callback.
func (o *consumer) signalSubs() []*subscription {o.mu.Lock()defer o.mu.Unlock()if o.sigSubs != nil {return o.sigSubs}subs := []*subscription{}if o.subjf == nil {var sub = subscription{subject: []byte(fwcs), icb: o.processStreamSignal}subs = append(subs, &sub)o.sigSubs = subso.srv.Noticef("signalSubs subject:%s, o. %s, DeliverSubject %s", string(sub.subject), o.name, o.cfg.DeliverSubject)return subs}for _, filter := range o.subjf {var sub = subscription{subject: []byte(filter.subject), icb: o.processStreamSignal}subs = append(subs, &sub)o.srv.Noticef("signalSubs subject:%s, o. %s , DeliverSubject %s", string(sub.subject), o.name)}o.sigSubs = subsreturn subs
}
另一个重要功能:signalNewMessages(如下所示)将在新创建的消费者中调用,等待新消息到达的消息
// Will signal us that new messages are available. Will break out of waiting.
func (o *consumer) signalNewMessages() {// Kick our new message channelselect {case o.mch <- struct{}{}:default:}
}
如果想得到更明细的内容,可以进一步阅读
function func (o *consumer) may trigger signalNewMessages()
和
func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) also may trigger signalNewMessages()
其它:
Option selectLimits 和Tier storage 不能共存
-
terms NRG: Drop append entries when upper layer is overloaded,
-
存储关键信息
const (
// JetStreamStoreDir is the prefix we use.
JetStreamStoreDir = “jetstream”
// JetStreamMaxStoreDefault is the default disk storage limit. 1TB
JetStreamMaxStoreDefault = 1024 * 1024 * 1024 * 1024
// JetStreamMaxMemDefault is only used when we can’t determine system memory. 256MB
JetStreamMaxMemDefault = 1024 * 1024 * 256
// snapshot staging for restores.
snapStagingDir = “.snap-staging”
)
相关文章:
nats jetstream server code 分析
对象和缩写 jetstream导入两个对象:stream and consumer,在stream 之上构造jetstreamapi。在nats代码中,以下是一些常见的缩写 1.mset is stream 2.jsX is something of jetstream 3.o is consumer 代码分析 对于producer ,发送…...
德鲁伊连接池
德鲁伊连接池(Druid Connection Pool)是一个开源的Java数据库连接池项目,用于提高数据库连接的性能和可靠性。德鲁伊连接池通过复用数据库连接、定时验证连接的可用性、自动回收空闲连接等机制,有效减少了数据库连接的创建和销毁开…...
Python从入门到精通1:FastAPI
引言 在现代 Web 开发中,API 是前后端分离架构的核心。FastAPI 凭借其高性能、简洁的语法和自动文档生成功能,成为 Python 开发者的首选框架。本文将从零开始,详细讲解 FastAPI 的核心概念、安装配置、路由设计、请求处理以及实际应用案例&a…...
C语言经典案例-菜鸟经典案例
1.输入某年某月某日,判断这一天是这一年的第几天? //输入某年某月某日,判断这一天是这一年的第几天? #include <stdio.h>int isLeapYear(int year) {// 闰年的判断规则:能被4整除且(不能被100整除或…...
SpringBoot过滤器(Filter)的使用:Filter接口、FilterRegistrationBean类配置、@WebFilter注释
1、过滤器(Filter)的介绍 Spring Boot 的过滤器用于对数据进行过滤处理。通过 Spring Boot 的过滤器,程序开发人员不仅可以对用户通过 URL 地址发送的请求进行过滤处理(例如:过滤一些错误的请求或者请求中的敏感词等),而且可以对服务器返回的数据进行过滤处理(例如:压…...
采用内存局部性分配有什么好处?
内存分配时的局部性分配(Locality of Allocation)是指将相关的内存对象分配在相邻或相近的内存区域中。这种分配策略在现代计算机系统中具有显著的好处,主要体现在以下几个方面: 1. 提高缓存命中率 现代计算机系统依赖于多级缓存…...
一周热点-OpenAI 推出了 GPT-4.5,这可能是其最后一个非推理模型
在人工智能领域,大型语言模型一直是研究的热点。OpenAI 的 GPT 系列模型在自然语言处理方面取得了显著成就。GPT-4.5 是 OpenAI 在这一领域的又一力作,它在多个方面进行了升级和优化。 1 新模型的出现 GPT-4.5 目前作为研究预览版发布。与 OpenAI 最近的 o1 和 o3 模型不同,…...
分布式ETCD面试题及参考答案
目录 ETCD 适用的六大场景及其实现原理 ETCD 与 Redis 在分布式锁实现上的差异 解释 ETCD 的 Watch 机制及其应用场景 ETCD 如何实现服务发现?与 ZooKeeper 有何不同? ETCD 实现服务发现的方式 与 ZooKeeper 的不同 ETCD 的键值存储模型支持哪些操作? 为什么 ETCD 适…...
MySQL进阶-关联查询优化
采用左外连接 下面开始 EXPLAIN 分析 EXPLAIN SELECT SQL_NO_CACHE * FROM type LEFT JOIN book ON type.card book.card; 结论:type 有All ,代表着全表扫描,效率较差 添加索引优化 ALTER TABLE book ADD INDEX Y ( card); #【被驱动表】࿰…...
ESP32驱动OV3660摄像头实现EdgeImpulse图像识别(摄像头支持红外夜视、边缘AI计算)
目录 1、传感器特性 2、硬件原理图 3、驱动程序 ESP32-S3 AI智能摄像头模块是一款专为智能家居和物联网应用打造的高性能边缘AI开发模组。它集成了摄像头、麦克风、音频功放、环境光传感器和夜视补光灯,无需依赖云端即可实现本地化AI推理。 凭借TensorFlow Lite、YOLO和O…...
SpringSecurity认证授权完整流程
SpringSecurity认证流程:loadUserByUsername()方法内部实现。 实现步骤: 构建一个自定义的service接口,实现SpringSecurity的UserDetailService接口。建一个service实现类,实现此loadUserByUsername方法。…...
java_了解反射机制
目录 1. 定义 2. 用途 3. 反射基本信息 4. 反射相关的类 4.1 class类(反射机制的起源) 4.1.1 Class类中的相关方法(方法的具体使用在后面的示例中) 4.2 反射的示例 4.2.1 获得Class对象的三种方式 4.2.2 反射的使用 Fiel…...
【赵渝强老师】管理MongoDB的运行
MongoDB提供了mongod命令用于启动MongoDB服务器端;而停止MongoDB服务器却可以通过几种不同的方式完成。下面分别进行介绍。 一、【实战】启动MongoDB服务器 通过执行下面的语句可以查看启动MongoDB服务器的帮助信息: mongod --help# 输出的信息如下&a…...
【学习思维模型】
学习思维模型 一、理解类模型二、记忆类模型三、解决问题类模型四、结构化学习模型五、效率与习惯类模型六、高阶思维模型七、实践建议八、新增学习思维模型**1. 波利亚问题解决四步法****2. 主动回忆(Active Recall)****3. 鱼骨图(因果图/Ishikawa Diagram)****4. MECE原则…...
阿里发布新开源视频生成模型Wan-Video,支持文生图和图生图,最低6G就能跑,ComFyUI可用!
Wan-Video 模型介绍:包括 Wan-Video-1.3B-T2V 和 Wan-Video-14B-T2V 两个版本,分别支持文本到视频(T2V)和图像到视频(I2V)生成。14B 版本需要更高的 VRAM 配置。 Wan2.1 是一套全面开放的视频基础模型&…...
安孚科技携手政府产业基金、高能时代发力固态电池,开辟南孚电池发展新赛道
安孚科技出手,发力固态电池。 3月7日晚间,安孚科技(603031.SH)发布公告称,公司控股子公司南孚电池拟与南平市绿色产业投资基金有限公司(下称“南平绿色产业基金”)、高能时代(广东横…...
moodle 开源的在线学习管理系统(LMS)部署
一、Moodle 简介 Moodle(Modular Object-Oriented Dynamic Learning Environment)是一个开源的在线学习管理系统(LMS),广泛应用于教育机构和企业培训。其核心功能包括课程管理、作业提交、在线测试、论坛互动和成绩跟…...
设备树的概念
可以理解为设备树的树干是系统总线,树枝上面是其他的不同的通信协议线。对于不同通信协议的设备挂载在对应的节点即可 在设备树出现以前,所有关于设备的具体信息都要写在驱动里,一旦外围设备变化,驱动代码就要重写。 引入了设…...
【ArcGIS】地理坐标系
文章目录 一、坐标系理论体系深度解析1.1 地球形态的数学表达演进史1.1.1 地球曲率的认知变化1.1.2 参考椭球体参数对比表 1.2 地理坐标系的三维密码1.2.1 经纬度的本质1.2.2 大地基准面(Datum)的奥秘 1.3 投影坐标系:平面世界的诞生1.3.1 投…...
MATLAB控制函数测试要点剖析
一、功能准确性检验 基础功能核验 针对常用控制函数,像用于传递函数建模的 tf 、构建状态空间模型的 ss ,以及开展阶跃响应分析的 step 等,必须确认其能精准执行基础操作。以 tf 函数为例,在输入分子与分母系数后,理…...
如何让一个类作为可调用对象被thread调用?
如何让一个类作为可调用对象,被 std::thread 调用 在 C 中,可以让一个类对象作为可调用对象(Callable Object),然后用 std::thread 进行调用。要实现这一点,主要有三种方法: 重载 operator()&…...
OpenWrt 串口终端常用命令---拓展篇
以下进一步拓展 OpenWrt 串口终端常用命令,新增更多高级操作与场景化工具,助你深入掌握系统管理与调试技巧: 一、系统信息与状态查询(扩展) 硬件详细探测 cat /proc/mtd # 查看 Flash 分区表(MTD 设备) mtd info # 显示 MTD 分…...
线上接口tp99突然升高如何排查?
当线上接口的 TP99 突然升高时,意味着该接口在 99% 的情况下响应时间变长,这可能会严重影响系统的性能和用户体验。可以按照下面的步骤进行排查。这里我们先说明一下如何计算tp99:监控系统计算 TP99(第 99 百分位数的响应时间&…...
如何借助人工智能AI模型开发一个类似OpenAI Operator的智能体实现电脑自动化操作?
这几天关于Manus的新闻铺天盖地,于是研究了一下AI智能体的实现思路,发现Openai 的OpenAI Operator智能体已经实现了很强的功能,但是每月200美金的价格高不可攀,而Manus的邀请码据说炒到了几万块!就想能不能求助人工智能…...
langchain系列(终)- LangGraph 多智能体详解
目录 一、导读 二、概念原理 1、智能体 2、多智能体 3、智能体弊端 4、多智能体优点 5、多智能体架构 6、交接(Handoffs) 7、架构说明 (1)网络 (2)监督者 (3)监督者&…...
springboot旅游管理系统设计与实现(代码+数据库+LW)
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本旅游管理系统就是在这样的大环境下诞生,其可以帮助使用者在短时间内处理完毕庞大的数据信息&a…...
【前端跨域】WebSocket如何实现跨域通信?原理、实践与安全指南
在实时通信场景(如在线聊天、实时数据推送)中,WebSocket因其高效的双向通信能力成为首选技术 然而,当客户端与服务器部署在不同源时,跨域问题同样可能阻碍WebSocket的连接 一、WebSocket与跨域的关系 WebSocket的跨…...
Go红队开发—格式导出
文章目录 输出功能CSV输出CSV 转 结构体结构体 转 CSV端口扫描结果使用CSV格式导出 HTML输出Sqlite输出nmap扫描 JSONmap转json结构体转jsonjson写入文件json编解码json转结构体json转mapjson转string练习:nmap扫描结果导出json格式 输出功能 在我们使用安全工具的…...
Sharp 存在任意文件读取漏洞( DVB-2025-8923)
免责声明 本文所描述的漏洞及其复现步骤仅供网络安全研究与教育目的使用。任何人不得将本文提供的信息用于非法目的或未经授权的系统测试。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权,请及时与我们联系,我们将尽快处理并删除相关内容。 0x01…...
C++数组,链表,二叉树的内存排列是什么样的,结构体占多大内存如何计算,类占多大内存如何计算,空类的空间是多少,为什么?
C数组是连续存储的,C数组元素依次存放在相邻的内存地址之中,并且内存大小相同。 C链表是离散存储的,C链表是由节点构成的,每个节点之中存在节点的值以及指向下一个节点的指针,每个节点是动态分配的。 C二叉树也是离散…...
【vLLM 教程】使用 TPU 安装
vLLM 是一款专为大语言模型推理加速而设计的框架,实现了 KV 缓存内存几乎零浪费,解决了内存管理瓶颈问题。 更多 vLLM 中文文档及教程可访问 →https://vllm.hyper.ai/ vLLM 使用 PyTorch XLA 支持 Google Cloud TPU。 依赖环境 Google Cloud TPU …...
【RAG】基于向量检索的 RAG (BGE示例)
RAG机器人 结构体 文本向量化: 使用 BGE 模型将文档和查询编码为向量。 (BGE 是专为检索任务优化的开源 Embedding 模型,除了本文API调用,也可以通过Hugging Face 本地部署BGE 开源模型) 向量检索: 从数据库中找到与查询相关的文…...
【RAG】RAG 系统的基本搭建流程(ES关键词检索示例)
RAG 系统的基本搭建流程 搭建过程: 文档加载,并按一定条件切割成片段将切割的文本片段灌入检索引擎封装检索接口构建调用流程:Query -> 检索 -> Prompt -> LLM -> 回复 1. 文档的加载与切割 # !pip install --upgrade openai…...
PSIM积累经验
1、三极管的部署报错。 出错信息: 元件: R 名称: R2 Error: The RLC branch R2 is connected to the gate node of the switch Q1. The gate node should be connected to an On-Off Controller output. Refer to the switch Help p…...
C++之vector类(超详解)
这节我们来学习一下,C中一个重要的工具——STL,这是C中自带的一个标准库,我们可以直接调用这个库中的函数或者容器,可以使效率大大提升。这节我们介绍STL中的vector。 文章目录 前言 一、标准库类型vector 二、vector的使用 2.…...
Go学习笔记
<!-- 注意* --> 初始化工程 go mod init GoDemo 结构体,接口 type i struct{} type i interface{} 条件,选择 循环 键值对 make(map[string]int) 切片,集合 make([]int,10) 函数 通道 Channel make(chan int) ch <- v…...
前端杂的学习笔记
什么是nginx Nginx (engine x) 是一个高性能的HTTP和反向代理web服务器 Nginx是一款轻量级的Web 服务器/反向代理服务器,处理高并发能力是十分强大的,并且支持热部署,启动简单,可以做到7*24不间断运行 正代和反代 学习nginx&a…...
痉挛性斜颈护理:全方位呵护,重燃生活希望
痉挛性斜颈是一种以颈部肌肉不自主收缩导致头部向一侧扭转或倾斜为特征的疾病。对于痉挛性斜颈患者而言,科学有效的护理能够显著提升其生活质量,辅助病情的改善。 生活护理:在生活环境布置上,要充分考虑患者行动的便利性。确保室内…...
MySQL的安装以及数据库的基本配置
MySQL的安装及配置 MySQL的下载 选择想要安装的版本,点击Download下载 Mysql官网下载地址: https://downloads.mysql.com/archives/installer/ MySQL的安装 选择是自定义安装,所以直接选择“Custom”,点击“Next” …...
WangEditor快速实现版
WangEditor快速实现版 效果 案例代码 后端 package com.diy.springboot.controller;import cn.hutool.core.util.IdUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiImplicitParam; import org.sp…...
LeetCode Hot100刷题——反转链表(迭代+递归)
206.反转链表 给你单链表的头节点 head ,请你反转链表,并返回反转后的链表。 示例 1: 输入:head [1,2,3,4,5] 输出:[5,4,3,2,1]示例 2: 输入:head [1,2] 输出:[2,1]示例 3&#…...
10.2 继承与多态
文章目录 继承多态 继承 继承的作用是代码复用。派生类自动获得基类的除私有成员外的一切。基类描述一般特性,派生类提供更丰富的属性和行为。在构造派生类时,其基类构造函数先被调用,然后是派生类构造函数。在析构时顺序刚好相反。 // 基类…...
java项目之基于ssm的智能训练管理平台(源码+文档)
项目简介 智能训练管理平台实现了以下功能: 系统可以提供信息显示和相应服务,其管理员增删改查课程信息和课程信息资料,审核课程信息预订订单,查看订单评价和评分,通过留言功能回复用户提问。 💕…...
29-验证回文串
如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后,短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字母和数字都属于字母数字字符。 给你一个字符串 s,如果它是 回文串 ,返回 true ;否则…...
(57)[HGAME 2023 week1]easyasm
nss:3477 [HGAME 2023 week1]easyasm 关于这个题吧,我还是和上一个题一样,我观察到了异或0x33 所以我就把result的结果跟0x33异或,然后我就就这样,做出来了...
FY-3D MWRI亮温绘制
1、FY-3D MWRI介绍 风云三号气象卫星(FY-3)是我国自行研制的第二代极轨气象卫星,其有效载荷覆 盖了紫外、可见光、红外、微波等频段,其目标是实现全球全天候、多光谱、三维定量 探测,为中期数值天气预报提供卫星观测数…...
Java集合面试题
引言 Java集合框架是Java编程中不可或缺的一部分,它提供了一系列用于存储和操作对象的接口和类。在Java面试中,集合框架的相关知识往往是必考的内容。本文将汇总一系列关于Java集合的面试题,帮助求职者更好地准备面试。 一、Java集合框架概…...
知识蒸馏综述Knowledge Distillation: A Survey解读
论文链接:Knowledge Distillation: A Survey 摘要:近年来,深度神经网络在工业界和学术界都取得了成功,尤其是在计算机视觉任务方面。深度学习的巨大成功主要归功于它能够扩展以对大规模数据进行编码,并且能够处理数十…...
ES映射知识
映射 映射类似于关系型数据库的Schema(模式)。 映射来定义字段列和存储的类型等基础信息。 {"mappings": {"properties": {"username": {"type": "keyword","ignore_above": 256 // 忽略…...
Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现与实战指南
Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现与实战指南 一、核心概念对比 1. 本质区别 维度过滤器(Filter)拦截器(Interceptor)规范层级Serv…...