当前位置: 首页 > news >正文

MCP 传输层代码分析

MCP 传输层代码分析

MCP 整体架构说明

引用官方文档原文:Model Context Protocol (MCP) 构建在一个灵活且可扩展的架构上,使 LLM 应用和集成之间的无缝通信成为可能。具体架构细节可以参考文档(核心架构 - MCP 中文文档)。MCP 采用分层架构,分为协议层与传输层,官方实现的传输层支持两种实现:Stdio 传输通过 HTTP 的 SSE 传输。本文旨在通过官方提供的 Python 库代码,来分析 通过 HTTP 的 SSE 传输 方式的实现。

Python 用到的技术

  • AnyIOanyio 是 Python 的一个异步编程库,它提供了统一的 API 来编写兼容多种异步运行时(如 asynciotrio)的代码。
  • StarletteStarlette 是一个轻量级的 ASGI(异步服务器网关接口) Web 框架,专为 Python 异步编程设计,具有高性能、灵活和极简的特点。

细节说明

在此我会列出以上两个库用到的最主要的几个方法,特别是第一个方法。

  • anyio.create_memory_object_stream:此方法返回一个输入流和一个输出流,可以将其看成是一个水管,返回的参数可以看成是水管的两端。从一个流输入数据,可以从另一个流输出数据。代码中总共用到了三组通道(即输入流和输出流)。传输层与协议层之间是通过流来交流数据的。
  • anyio.create_task_group:用于创建任务组。
  • EventSourceResponse:这是 StarletteFastAPI 中用于实现 Server-Sent Events (SSE) 的响应类。注意代码中用到的参数 content,用于接收输入流,data_sender_callable,用于发送数据的方法(我没有找到相关的文档)。

整体流程

在这里插入图片描述

对于协议层的封装在 lowlevel.server.Server 中,其中 run 方法接收一个输入流和一个输出流。函数签名如下:

async def run(self,read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception],write_stream: MemoryObjectSendStream[types.JSONRPCMessage],initialization_options: InitializationOptions,# When False, exceptions are returned as messages to the client.# When True, exceptions are raised, which will cause the server to shut down# but also make tracing exceptions much easier during testing and when using# in-process servers.raise_exceptions: bool = False,
):

传输层的主要工作是初始化一个输入流和输出流,用于传输客户端请求命令及服务端的响应数据。

SseServerTransport 类为传输层的实现类,该类对客户端暴露了两个端点:

  • /sse
    • 作用:创建一个 SSE 通道,初始化输入输出流,创建 sessionId,传输服务端响应的数据(从 read_stream 读取数据),启动协议层的服务。
    • 状态:有状态。
  • /message
    • 作用:接收客户端的 POST 请求,将请求传递给 write_stream
    • 状态:无状态。
starlette_app = Starlette(debug=self.settings.debug,routes=[Route("/sse", endpoint=handle_sse),Mount("/messages/", app=sse.handle_post_message),],
)

初始化 SSE 连接的代码

@asynccontextmanager
async def connect_sse(self, scope: Scope, receive: Receive, send: Send):if scope["type"] != "http":logger.error("connect_sse received non-HTTP request")raise ValueError("connect_sse can only handle HTTP requests")logger.debug("Setting up SSE connection")read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]write_stream: MemoryObjectSendStream[types.JSONRPCMessage]write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]read_stream_writer, read_stream = anyio.create_memory_object_stream(0)write_stream, write_stream_reader = anyio.create_memory_object_stream(0)session_id = uuid4()session_uri = f"{quote(self._endpoint)}?session_id={session_id.hex}"self._read_stream_writers[session_id] = read_stream_writerlogger.debug(f"Created new session with ID: {session_id}")sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, Any]](0)async def sse_writer():logger.debug("Starting SSE writer")async with sse_stream_writer, write_stream_reader:await sse_stream_writer.send({"event": "endpoint", "data": session_uri})logger.debug(f"Sent endpoint event: {session_uri}")async for message in write_stream_reader:logger.debug(f"Sending message via SSE: {message}")await sse_stream_writer.send({"event": "message","data": message.model_dump_json(by_alias=True, exclude_none=True),})async with anyio.create_task_group() as tg:response = EventSourceResponse(content=sse_stream_reader, data_sender_callable=sse_writer)logger.debug("Starting SSE response task")tg.start_soon(response, scope, receive, send)logger.debug("Yielding read and write streams")yield (read_stream, write_stream)

以上代码中初始化了三个通道,其中 sse_stream_writersse_stream_reader 没有对外暴露。它们用于将数据传给 EventSourceResponse。三个通道之间的数据流向较为复杂,建议在阅读代码时结合流程图来分析,以便更好地理解数据的走向。注意 read_stream_writer 被添加进了一个字典中,这是因为 read_stream_writer 是在 handle_post_message 中传输用户请求的。由于 POST 请求是无状态的,为了找到要发送给哪个 read_stream_writer,必须通过 session_id 去查找。

对于不习惯协程编程范式的人来说,理解这几段代码可能还有些困难。有时间我会继续修改这篇文章,将整个流程说明白。

相关文章:

MCP 传输层代码分析

MCP 传输层代码分析 MCP 整体架构说明 引用官方文档原文:Model Context Protocol (MCP) 构建在一个灵活且可扩展的架构上,使 LLM 应用和集成之间的无缝通信成为可能。具体架构细节可以参考文档(核心架构 - MCP 中文文档)。MCP 采…...

OBS studio 减少音频中的杂音(噪音)

1. 在混音器中关闭除 麦克风 之外的所有的音频输入设备 2.在滤镜中增加“噪声抑制”和“噪声门限”...

java的Stream流处理

Java Stream 流处理详解 Stream 是 Java 8 引入的一个强大的数据处理抽象,它允许你以声明式方式处理数据集合(类似于 SQL 语句),支持并行操作,提高了代码的可读性和处理效率。 一、Stream 的核心概念 1. 什么是 Str…...

Windows使用虚拟环境执行sh脚本

在代码文件夹git bash here echo ‘export PATH“/f/anaconda/Scripts:$PATH”’ >> ~/.bashrc echo ‘source /f/anaconda/etc/profile.d/conda.sh’ >> ~/.bashrc source ~/.bashrc conda路径确认 where conda conda activate mmt bash ./online.sh感谢gpt记录…...

Transformer Decoder-Only 算力FLOPs估计

FLOPs和FLOPS的区别 FLOPs (Floating Point Operations)是指模型或算法执行过程中总的浮点运算次数,单位是“次”FLOPS (Floating Point Operations Per Second)是指硬件设备(如 GPU 或 CPU)每…...

数字电子技术基础(五十七)——边沿触发器

目录 1 边沿触发器 1.1 边沿触发器简介 1.1.1 边沿触发器的电路结构 1.3 边沿触发的D触发器和JK触发器 1.3.1 边沿触发的D型触发器 1.3.2 边沿触发的JK触发器 1 边沿触发器 1.1 边沿触发器简介 对于时钟触发的触发器来说,始终都存在空翻的现象,抗…...

05.three官方示例+编辑器+AI快速学习three.js webgl - animation - skinning - ik

本实例主要讲解内容 这个Three.js示例展示了**反向运动学(Inverse Kinematics, IK)**在3D角色动画中的应用。通过加载一个角色模型,演示了如何使用IK技术实现自然的肢体运动控制,如手部抓取物体的动作。 核心技术包括: CCD反向运动学求解器…...

MYSQL数据库集群高可用和数据监控平台

项目环境 项目拓扑结构 软硬件环境清单 软硬件环境清单 软硬件环境清单 主机名IP硬件软件 master1 192.168.12.130 VIP:192.168.12.200 cpu:1颗2核 内 存:2GB HDD:20GB 网 络:NAT VmWare17 OpenEuler22.03 SP4 MySql8.0.3…...

《异常链机制详解:如何优雅地传递Java中的错误信息?》

大家好呀!👋 作为一名Java开发者,相信你一定见过各种奇奇怪怪的异常报错。但有没有遇到过这样的情况:明明只调用了一个方法,却看到异常信息像俄罗斯套娃一样一层层展开?🤔 这就是我们今天要讲的…...

MySQL 数据库集群部署、性能优化及高可用架构设计

MySQL 数据库集群部署、性能优化及高可用架构设计 集群部署方案 1. 主从复制架构 传统主从复制:配置一个主库(Master)和多个从库(Slave)GTID复制:基于全局事务标识符的复制,简化故障转移半同步复制:确保至少一个从库接收到数据…...

什么是深度神经网络

深度神经网络(DNN)详细介绍 1. 定义与核心原理 深度神经网络(Deep Neural Network, DNN)是一种具有多个隐藏层的人工神经网络模型,其核心在于通过层次化的非线性变换逐步提取输入数据的高层次抽象特征。与浅层神经网络相比,DNN的隐藏层数量通常超过三层,例如VGGNet、R…...

深入解析PyTorch中MultiheadAttention的隐藏参数add_bias_kv与add_zero_attn

关键背景 最近在学习pytorch中的源码尤其是nn.modules下算子的实现,针对activation.py下MultiheadAttention下有两个不常见的参数的使用比较有趣,因为时序领域很少使用这两个参数(add_bias_kv和add_zero_attn),但是其…...

最大化效率和性能:AKS 中节点池的强大功能

什么是节点池 在 Azure Kubernetes 服务 (AKS) 中,相同配置的节点会被分组到节点池中。这些节点池包含运行应用程序的底层虚拟机。创建 AKS 集群时,您需要定义初始节点数及其大小 (SKU)。随着应用程序需求的变化,您可能需要更改节点池的设置…...

用户态到内核态:Linux信号传递的九重门(一)

1. 信号的认识 1.1. 信号的特点 异步通知:信号是异步的,发送信号的进程无需等待接收进程的响应。预定义事件:每个信号对应一个预定义的事件(如终止、中断、段错误等)。 轻量级:信号不携带大量数据&#xf…...

c语言第一个小游戏:贪吃蛇小游戏01

hello啊大家好 今天我们用一个小游戏来增强我们的c语言! 那就是贪吃蛇 为什么要做一个贪吃蛇小游戏呢? 因为这个小游戏所涉及到的知识有c语言的指针、数组、链表、函数等等可以让我们通过这个游戏来巩固c语言,进一步认识c语言。 一.我们先…...

JAVA EE_网络原理_网络层

晨雾散尽,花影清晰。 ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ----------陳長生. ❀主页:陳長生.-CSDN博客❀ 📕上一篇:数据库Mysql_联…...

前端性能指标及优化策略——从加载、渲染和交互阶段分别解读详解并以Webpack+Vue项目为例进行解读

按照加载阶段、渲染阶段和交互阶段三个维度进行系统性阐述: 在现代 Web 开发中,性能不再是锦上添花,而是决定用户体验与业务成败的关键因素。为了全面监控与优化网页性能,我们可以将性能指标划分为加载阶段、渲染阶段、和交互阶段…...

Flink 系列之十五 - 高级概念 - 窗口

之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容&#xff0c…...

控制台打印带格式内容

1. 场景 很多软件会在控制台打印带颜色和格式的文字,需要使用转义符实现这个功能。 2. 详细说明 2.1.转义符说明 样式开始:\033[参数1;参数2;参数3m 可以多个参数叠加,若同一类型的参数(如字体颜色)设置了多个&…...

Linux为啥会重新设置中断请求号与中断向量号之间的关系?

Linux内核重新设置中断请求号(IRQ)与中断向量号之间的关系,主要出于以下核心原因和设计考量: ​1. 硬件多样性与抽象需求​ ​硬件中断号(HW Interrupt ID)的差异​ 不同处理器架构的中断控制器&#xff08…...

自然语言处理NLP中的连续词袋(Continuous bag of words,CBOW)方法、优势、作用和程序举例

自然语言处理NLP中的连续词袋(Continuous bag of words,CBOW)方法、优势、作用和程序举例 目录 自然语言处理NLP中的连续词袋(Continuous bag of words,CBOW)方法、优势、作用和程序举例一、连续词袋( Cont…...

计算机网络笔记(二十二)——4.4网际控制报文协议ICMP

4.4.1ICMP报文的种类 ICMP(Internet Control Message Protocol)是IP协议的辅助协议,主要用于传递控制消息、错误报告和诊断信息。其报文分为两大类:查询报文和错误报告报文。 1. 错误报告报文(Error Messages&#x…...

【AI论文】作为评判者的感知代理:评估大型语言模型中的高阶社会认知

摘要:评估大型语言模型(LLM)对人类的理解程度,而不仅仅是文本,仍然是一个开放的挑战。 为了弥合这一差距,我们引入了Sentient Agent作为评判者(SAGE),这是一个自动评估框…...

Kubernetes生产实战(二十七):精准追踪Pod数据存储位置

在生产环境中,快速定位Pod数据的物理存储位置是运维人员的基本功。本文将揭秘Kubernetes存储系统的核心原理,并提供一套经过实战检验的定位方法体系。 一、存储架构全景图 K8S存储架构 Pod --> Volume Mount --> PVC --> PV --> Storage P…...

极新携手火山引擎,共探AI时代生态共建的破局点与增长引擎

在生成式AI与行业大模型的双重驱动下,人工智能正以前所未有的速度重构互联网产业生态。从内容创作、用户交互到商业决策,AI技术渗透至产品研发、运营的全链条,推动效率跃升与创新模式变革。然而,面对AI技术迭代的爆发期&#xff0…...

[SIGPIPE 错误] 一个 Linux socket 程序,没有任何报错打印直接退出程序

1. 问题 在编写一个程序的时候,当然程序很复杂,遇到了一个 Linux socket 程序,没有任何报错打印直接退出程序,但是在程序里面我有很多 error log ,在程序退出的时候完全没有打印。为了说明问题,我编写了一…...

Qt 界面优化(绘图)

目录 1. 绘图基本概念2. 绘制各种形状2.1 绘制线段2.2 绘制矩形2.3 绘制圆形2.4 绘制文本2.5 设置画笔2.6 设置画刷 3. 绘制图片3.1 绘制简单图片3.2 平移图片3.3 缩放图片3.4 旋转图片 4. 其他设置4.1 移动画家位置4.2 保存/加载画家的状态 5. 特殊的绘图设备5.1 QPixmap5.2 Q…...

AQS(AbstractQueuedSynchronizer)解析

文章目录 一、AQS简介二、核心设计思想2.1 核心设计思想回顾2.2 CLH锁队列简介2.3 AQS对CLH队列的改动及其原因 三、核心组件详解3.1 state 状态变量3.2 同步队列 (FIFO双向链表) 四、核心方法深度解析4.1 获取同步状态 (独占模式) - acquire(int arg)4.2 释放同步状态 (独占模…...

Java并发编程常见问题与陷阱解析

引言 随着计算机硬件技术的飞速发展,多核处理器已经变得普遍,Java并发编程的重要性也日益凸显。然而,多线程编程并非易事,其中充满了许多潜在的问题和陷阱。作为一名Java开发工程师,掌握并发编程的常见问题及其解决方案…...

DEEPPOLAR:通过深度学习发明非线性大核极坐标码(1)

原文:《DEEPPOLAR: Inventing Nonlinear Large-Kernel Polar Codes via Deep Learning》 摘要 信道编码设计的进步是由人类的创造力推动的,而且恰如其分地说,这种进步是零星的。极性码是在Arikan极化核的基础上开发的,代表了编码…...

Java多态详解

Java多态详解 什么是多态? 比如我们说:“驾驶一辆车”,有人开的是自行车,有人开的是摩托车,有人开的是汽车。虽然我们都说“开车”,但“怎么开”是由具体的车类型决定的:“开”是统一的动作&a…...

go程序编译成动态库,使用c进行调用

以下是使用 Go 语言打包成 .so 库并使用 C 语言调用的完整步骤: 1. Go 语言打包成 .so 库 (1)编写 Go 代码 创建一个 Go 文件(如 calculator.go),并定义需要导出的函数。导出的函数名必须以大写字母开头…...

iVX:图形化编程与组件化的强强联合

在数字化浪潮中,软件开发范式正经历着从文本到图形的革命性转变。iVX 作为国产可视化编程领域的领军者,以 “图形化逻辑 组件化架构” 的双重创新,重新定义了软件开发的效率边界。其技术突破不仅体现在开发方式的革新,更通过一系…...

华为配置篇-RSTP/MSTP实验

MSTP 一、简介二、常用命令总结三、实验 一、简介 RSTP(快速生成树协议)​ RSTP(Rapid Spanning Tree Protocol)是 STP 的改进版本,基于 ​​IEEE 802.1w 标准​​,核心目标是解决传统 STP 收敛速度慢的问…...

端口号被占用怎么解决

windows环境下端口号被占用怎么解决 win r 快捷键打开cmd输入netstat -ano|findstr 端口号 通过这个命令找到pidtaskkill /pid pid端口号 /t /f 如下图所示 命令解读 netstat 是一个网络统计工具,它可以显示协议统计信息和当前的TCP/IP网络连接。 -a 参数告诉 nets…...

GO语言-导入自定义包

文章目录 1. 项目目录结构2. 创建自定义包3. 初始化模块4. 导入自定义包5. 相对路径导入 在Go语言中导入自定义包需要遵循一定的目录结构和导入规则。以下是详细指南(包含两种方式): 1. 项目目录结构 方法1:适用于Go 1.11 &#…...

ES常识5:主分词器、子字段分词器

文章目录 一、主分词器:最基础的文本处理单元主分词器的作用典型主分词器示例 二、其他类型的分词器:解决主分词器的局限性1. 子字段分词器(Multi-fields)2. 搜索分词器(Search Analyzer)3. 自定义分词器&a…...

NoSQL数据库技术与应用复习总结【看到最后】

第1章 初识NoSQL 1.1 大数据时代对数据存储的挑战 1.高并发读写需求 2.高效率存储与访问需求 3.高扩展性 1.2 认识NoSQL NoSQL--非关系型、分布式、不提供ACID的数据库设计模式 NoSQL特点 1.易扩展 2.高性能 3.灵活的数据模型 4.高可用 NoSQL拥有一个共同的特点&am…...

单片机-STM32部分:12、I2C

飞书文档https://x509p6c8to.feishu.cn/wiki/MsB7wLebki07eUkAZ1ec12W3nsh 一、简介 IIC协议,又称I2C协议,是由PHILP公司在80年代开发的两线式串行总线,用于连接微控制器及其外围设备,IIC属于半双工同步通信方式。 IIC是一种同步…...

【英语笔记(四)】诠释所有16种英语时态,介绍每种时态下的动词变形!!含有所有时态的的动词变形汇总表格

1 时态的单词构成 1.1 现在 1.1.1 一般现在时态 动词原形动词原形s(第三人称单数) 1.1.1.1 表达事实 I eat carrots. 我吃胡萝卜:我是吃胡萝卜这种食物的.(这个是事实陈述) The rabbit eats carrots. 兔子吃胡萝卜…...

【质量管理】什么是过程?

在文章【质量管理】谁是顾客?什么是质量链?-CSDN博客 中我们了解了什么是顾客,顾客不仅仅是企业以外的人,在企业的内部我们也有大大小小的顾客。并且我们了解了什么是质量链,企业内部的各种供给方和客户形成了质量链。…...

效率办公新工具:PDF Reader Pro V5.0功能解析与使用体验

在日常文档处理与数字办公的场景中,PDF 文件依然是主流格式之一。从合同审批、项目文档、财务报表,到技术方案和用户手册,PDF 的编辑、转换、标注、归档需求始终存在。 面对这些需求,越来越多用户希望有一款功能完整、跨平台、智…...

Java对象内存布局和对象头

1、面试题 1)说下JUC,AQS的大致流程 CAS自旋锁,是获取不到锁就一直自旋吗? 2)CAS和synchronized区别在哪里,为什么CAS好,具体优势在哪里? 3)sychro…...

Vue 跨域解决方案及其原理剖析

在现代 Web 开发中,跨域问题是前端开发者经常面临的挑战之一。当使用 Vue.js 构建应用时,跨域请求的处理尤为重要。本文将深入探讨 Vue 解决跨域的多种方法及其背后的原理,帮助开发者更好地理解和应对这一常见问题。 一、跨域问题概述 1. 同…...

TikTok 互动运营干货:AI 助力提升粘性

在 TikTok 运营的众多环节中,与用户的互动是建立紧密联系、提升账号粘性的关键所在。及时且真诚地回复评论和私信,能让用户切实感受到你的关注与尊重,从而极大地增强他们对你的好感与粘性。对于用户提出的问题,要以耐心、专业的态…...

Kids A-Z安卓版:儿童英语启蒙的优质选择

Kids A-Z安卓版 是一款由北美知名分级读物厂商 Learning A-Z 官方推出的英语分级学习应用,也被称为 Raz-Kids app。它专为 K-5 年级的学生设计,提供丰富的英语学习资源和互动学习体验,帮助孩子们在轻松愉快的环境中提升英语能力。通过动画、互…...

接口继承与扩展的使用技巧

在 TypeScript 中,接口继承和扩展是非常强大且灵活的功能,可以帮助我们更高效地管理类型和提高代码的可重用性。接口继承使得一个接口可以从另一个接口继承属性和方法,而接口扩展允许我们通过组合多个接口来构建更复杂的结构。这些特性使得 T…...

【React】Craco 简介

Craco 简介 Craco (Create React App Configuration Override) 是一个用于自定义 Create React App (CRA) 配置的工具,无需 eject(弹出)项目。 为什么需要 Craco Create React App 虽然提供了零配置的 React 开发体验,但其配置…...

HTML5中的Microdata与历史记录管理详解

Microdata 简介 Microdata 是 HTML5 引入的一种标记方式,用于在网页中嵌入机器可读的语义信息。通过使用 Microdata,开发者可以在 HTML 元素中添加特定的属性,以便搜索引擎和其他工具更好地理解网页内容。 Microdata 的核心属性包括 itemsc…...

UNet网络 图像分割模型学习

UNet 由Ronneberger等人于2015年提出,专门针对医学图像分割任务,解决了早期卷积网络在小样本数据下的效率问题和细节丢失难题。 一 核心创新 1.1对称编码器-解码器结构 实现上下文信息与高分辨率细节的双向融合 如图所示:编码器进行了4步&…...