【MCP Node.js SDK 全栈进阶指南】高级篇(1):MCP多服务器协作架构
随着业务规模的不断扩大和系统复杂度的提升,单一服务器架构往往无法满足高并发、高可用性和弹性扩展的需求。在MCP生态系统中,多服务器协作架构成为构建大规模应用的必然选择。本文将深入探讨MCP TypeScript-SDK在多服务器环境下的部署、协作和管理,以及如何构建高可用、高性能、易扩展的分布式MCP应用。
目录
-
多服务器架构基础
1.1 MCP多服务器架构概述
1.2 分布式系统的挑战与解决方案
1.3 MCP服务器分类与职责划分
1.4 多服务器架构的常见模式 -
服务发现与路由
2.1 服务注册与发现机制
2.2 基于DNS的服务发现
2.3 动态路由与负载均衡
2.4 服务健康检查与自动恢复 -
状态同步与一致性
3.1 分布式状态管理策略
3.2 事件驱动的状态同步
3.3 实现最终一致性
3.4 数据分区与冲突解决 -
负载均衡与容错设计
4.1 负载均衡策略与实现
4.2 故障检测与自动恢复
4.3 优雅降级与熔断机制
4.4 构建高可用MCP集群
1. 多服务器架构基础
在深入探讨MCP多服务器协作之前,我们需要先了解多服务器架构的核心概念、挑战以及MCP SDK提供的解决方案。
1.1 MCP多服务器架构概述
MCP(Model Context Protocol)作为一种为大型语言模型(LLM)交互设计的协议,其服务架构需要适应复杂多变的业务场景和负载模式。多服务器MCP架构是指将MCP服务分布在多个物理或虚拟服务器上,通过协作方式提供统一的服务能力。
// MCP服务器实例类型
interface McpServerInstance {id: string; // 服务器唯一标识host: string; // 主机地址port: number; // 端口号role: 'master' | 'worker' | 'specialized'; // 服务器角色status: 'online' | 'offline' | 'degraded'; // 服务器状态capacity: { // 服务器容量信息maxConcurrentRequests: number;currentLoad: number;};features: string[]; // 支持的特性startTime: Date; // 启动时间
}// 多服务器集群配置
interface McpClusterConfig {serviceName: string; // 服务名称serviceVersion: string;// 服务版本discoveryMethod: 'static' | 'dns' | 'registry'; // 服务发现方式heartbeatInterval: number; // 心跳间隔syncStrategy: 'event-based' | 'polling' | 'hybrid'; // 同步策略loadBalanceStrategy: 'round-robin' | 'least-connections' | 'consistent-hash'; // 负载均衡策略failoverPolicy: { // 故障转移策略retryAttempts: number;retryDelay: number;circuitBreakerThreshold: number;};
}
多服务器MCP架构具有以下几个核心特点:
- 水平扩展性:通过增加服务器数量来线性提升系统整体处理能力
- 高可用性:即使部分服务器故障,系统整体仍可持续提供服务
- 负载分散:将请求负载分散到多个服务器,避免单点压力过大
- 资源隔离:可按业务功能或客户需求隔离资源,提高安全性和稳定性
- 灵活部署:支持混合云、多区域、边缘计算等多样化部署模式
1.2 分布式系统的挑战与解决方案
构建分布式MCP服务面临一系列挑战,MCP TypeScript-SDK针对这些挑战提供了相应的解决方案:
挑战 | 表现 | MCP SDK解决方案 |
---|---|---|
网络不可靠 | 网络延迟、分区、丢包 | 重试机制、异步通信、断线重连 |
一致性问题 | 数据不一致、冲突 | 事件驱动同步、版本控制、冲突解决策略 |
服务协调 | 服务发现、路由 | 服务注册表、服务健康检查、动态路由 |
故障处理 | 节点故障、服务降级 | 故障检测、自动恢复、熔断机制 |
性能瓶颈 | 请求拥塞、资源竞争 | 负载均衡、请求限流、资源隔离 |
// MCP服务器故障处理配置
interface McpFailoverConfig {detection: {method: 'heartbeat' | 'ping' | 'health-endpoint';interval: number; // 检测间隔timeout: number; // 超时时间thresholds: {warning: number; // 警告阈值critical: number; // 临界阈值}};recovery: {strategy: 'restart' | 'replace' | 'redirect';cooldownPeriod: number; // 冷却时间maxAttempts: number; // 最大尝试次数};notification: {channels: string[]; // 通知渠道templates: Record<string, string>; // 通知模板}
}
1.3 MCP服务器分类与职责划分
在多服务器架构中,不同的MCP服务器可以承担不同的角色和职责,实现功能分离和专业化:
1.3.1 按角色分类
-
主服务器(Master):
- 管理集群状态和配置
- 协调跨服务器的资源分配
- 监控集群健康状态
- 通常部署较少数量,配置较高
-
工作服务器(Worker):
- 处理客户端的实际请求
- 执行MCP资源访问和工具调用
- 可大规模部署,构成系统处理能力的主体
- 通常是无状态的,便于横向扩展
-
专用服务器(Specialized):
- 专注于特定功能或业务场景
- 例如:AI推理服务器、数据处理服务器等
- 可根据需求定制化配置
- 适合资源密集型或安全敏感型业务
// MCP服务器角色定义和职责配置
import { McpServer } from '@modelcontextprotocol/sdk';// 主服务器配置
const masterConfig = {name: "mcp-master-server",description: "MCP集群主服务器",version: "1.0.0",cluster: {role: "master",workers: ["worker-1", "worker-2", "worker-3"],electionStrategy: "fixed", // 固定主服务器stateSync: {method: "push",interval: 5000}}
};// 工作服务器配置
const workerConfig = {name: "mcp-worker-server",description: "MCP集群工作服务器",version: "1.0.0",cluster: {role: "worker",masterId: "master-1",maxConcurrentRequests: 1000,resourceCacheSize: 500,reportInterval: 2000}
};// 专用服务器配置
const specializedConfig = {name: "mcp-specialized-server",description: "MCP图像处理专用服务器",version: "1.0.0",cluster: {role: "specialized",serviceType: "image-processing",supportedOperations: ["resize", "filter", "recognize"],resourceRequirements: {gpu: true,minMemory: "16G",cpuCores: 8}}
};
1.3.2 按功能分类
-
API网关服务器:
- 请求路由和负载均衡
- 认证和授权处理
- 请求限流和缓存
- 请求/响应转换
-
资源服务器:
- 管理MCP资源模板
- 处理资源访问请求
- 资源缓存和优化
- 资源版本控制
-
工具服务器:
- 托管和执行MCP工具
- 工具依赖管理
- 工具执行环境隔离
- 工具性能监控
-
状态同步服务器:
- 维护全局状态
- 协调跨服务器状态同步
- 处理分布式事务
- 解决数据冲突
// 按功能划分的MCP服务器实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';// API网关服务器
const apiGatewayApp = express();
const apiGatewayServer = new McpServer({name: "mcp-api-gateway",description: "MCP API网关服务器",version: "1.0.0"
});// 路由配置
const routeConfig = {'/api/resources': { target: 'http://resource-server:3001', pathRewrite: {'^/api': ''} },'/api/tools': { target: 'http://tool-server:3002', pathRewrite: {'^/api': ''} }
};// 设置API代理
Object.entries(routeConfig).forEach(([path, config]) => {apiGatewayApp.use(path, createProxyMiddleware(config));
});// 资源服务器
const resourceServer = new McpServer({name: "mcp-resource-server",description: "MCP资源服务器",version: "1.0.0"
});// 注册资源模板
resourceServer.registerResourceTemplate({name: "users",description: "用户资源",schema: userSchema
});// 工具服务器
const toolServer = new McpServer({name: "mcp-tool-server",description: "MCP工具服务器",version: "1.0.0"
});// 注册工具
toolServer.registerTool({name: "calculator",description: "计算工具",parameters: calculatorSchema,execute: async (params) => {// 计算逻辑return { result: calculateExpression(params.expression) };}
});
1.4 多服务器架构的常见模式
在实际应用中,MCP多服务器架构可以采用多种模式,根据业务需求和资源条件灵活选择:
1.4.1 主从模式(Master-Slave)
最常见的分布式架构模式,一个主服务器协调多个从服务器的工作。
┌────────────┐ ┌────────────┐
│ │ │ │
│ Master │◄────►│ Slave 1 │
│ │ │ │
└────────────┘ └────────────┘▲│▼
┌────────────┐ ┌────────────┐
│ │ │ │
│ Slave 2 │◄────►│ Slave 3 │
│ │ │ │
└────────────┘ └────────────┘
// 主从模式实现示例
import { McpServer, EventEmitter } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';// 创建Redis客户端用于服务器间通信
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();// 主服务器实现
class MasterMcpServer extends McpServer {private slaves: Map<string, SlaveInfo> = new Map();constructor(config) {super(config);// 订阅从服务器状态更新subClient.subscribe('slave-status', (message) => {const slaveStatus = JSON.parse(message);this.updateSlaveStatus(slaveStatus);});// 定期检查从服务器健康状态setInterval(() => this.checkSlavesHealth(), 10000);}// 注册从服务器registerSlave(slaveId: string, info: SlaveInfo) {this.slaves.set(slaveId, { ...info, lastHeartbeat: Date.now() });this.emit('slave-registered', { slaveId, info });}// 更新从服务器状态updateSlaveStatus(status: SlaveStatus) {const slave = this.slaves.get(status.slaveId);if (slave) {this.slaves.set(status.slaveId, { ...slave, ...status, lastHeartbeat: Date.now() });}}// 检查从服务器健康状态checkSlavesHealth() {const now = Date.now();for (const [slaveId, info] of this.slaves.entries()) {if (now - info.lastHeartbeat > 30000) { // 30秒无心跳this.emit('slave-offline', { slaveId });this.slaves.delete(slaveId);}}}
}// 从服务器实现
class SlaveMcpServer extends McpServer {private masterId: string;constructor(config) {super(config);this.masterId = config.masterId;// 定期向主服务器发送心跳setInterval(() => this.sendHeartbeat(), 5000);// 订阅主服务器指令subClient.subscribe(`slave-command:${this.id}`, (message) => {this.handleMasterCommand(JSON.parse(message));});}// 发送心跳sendHeartbeat() {const status = {slaveId: this.id,load: this.getCurrentLoad(),memory: process.memoryUsage(),status: 'online'};pubClient.publish('slave-status', JSON.stringify(status));}// 处理主服务器指令handleMasterCommand(command) {switch (command.type) {case 'restart':this.restart();break;case 'update-config':this.updateConfig(command.config);break;// 其他指令处理...}}
}
1.4.2 去中心化模式(Peer-to-Peer)
所有服务器地位平等,通过协作提供服务,无单点故障风险。
┌────────────┐ ┌────────────┐
│ │◄────►│ │
│ Node 1 │ │ Node 2 │
│ │◄────►│ │
└────────────┘ └────────────┘▲ ▲ ▲│ └───────┐ ┌───┘│ ▼ ▼
┌────────────┐ ┌────────────┐
│ │◄────►│ │
│ Node 3 │ │ Node 4 │
│ │◄────►│ │
└────────────┘ └────────────┘
// 去中心化模式实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import * as IPFS from 'ipfs-core';
import { v4 as uuidv4 } from 'uuid';// P2P MCP服务器
class P2PMcpServer extends McpServer {private ipfs;private peerId: string;private peers: Map<string, PeerInfo> = new Map();private eventTopic = 'mcp-p2p-events';constructor(config) {super(config);this.peerId = uuidv4();this.initializeP2P();}// 初始化P2P网络async initializeP2P() {// 创建IPFS节点this.ipfs = await IPFS.create();// 获取本节点IDconst nodeInfo = await this.ipfs.id();console.log(`P2P节点ID: ${nodeInfo.id}`);// 订阅事件主题await this.ipfs.pubsub.subscribe(this.eventTopic, (msg) => {this.handleP2PEvent(msg);});// 定期发布状态setInterval(() => this.broadcastStatus(), 5000);// 发布上线事件this.broadcastEvent({type: 'node-online',peerId: this.peerId,timestamp: Date.now(),info: {resources: this.getResourceNames(),tools: this.getToolNames(),capacity: this.getCapacity()}});}// 处理P2P事件handleP2PEvent(message) {try {const event = JSON.parse(message.data.toString());// 忽略自己发出的消息if (event.peerId === this.peerId) return;switch (event.type) {case 'node-online':this.addPeer(event.peerId, event.info);// 向新节点发送欢迎消息this.sendDirectEvent(event.peerId, {type: 'welcome',peerId: this.peerId,timestamp: Date.now()});break;case 'node-offline':this.removePeer(event.peerId);break;case 'status-update':this.updatePeerStatus(event.peerId, event.status);break;case 'resource-request':this.handleResourceRequest(event);break;case 'tool-execution':this.handleToolExecution(event);break;}} catch (error) {console.error('处理P2P事件错误:', error);}}// 广播事件到所有节点async broadcastEvent(event) {await this.ipfs.pubsub.publish(this.eventTopic,Buffer.from(JSON.stringify(event)));}// 发送事件到特定节点async sendDirectEvent(targetPeerId, event) {const peer = this.peers.get(targetPeerId);if (!peer || !peer.directTopic) return;await this.ipfs.pubsub.publish(peer.directTopic,Buffer.from(JSON.stringify(event)));}// 定期广播状态broadcastStatus() {this.broadcastEvent({type: 'status-update',peerId: this.peerId,timestamp: Date.now(),status: {load: this.getCurrentLoad(),memory: process.memoryUsage(),uptime: process.uptime()}});}// 添加对等节点addPeer(peerId: string, info: any) {this.peers.set(peerId, {...info,directTopic: `mcp-p2p-direct:${peerId}`,lastSeen: Date.now()});// 订阅此节点的直接通信主题this.ipfs.pubsub.subscribe(`mcp-p2p-direct:${this.peerId}`, (msg) => {this.handleDirectMessage(msg);});}// 移除对等节点removePeer(peerId: string) {this.peers.delete(peerId);}// 更新对等节点状态updatePeerStatus(peerId: string, status: any) {const peer = this.peers.get(peerId);if (peer) {this.peers.set(peerId, { ...peer, ...status, lastSeen: Date.now() });}}
}
1.4.3 微服务模式(Microservices)
将MCP功能分解为多个独立服务,每个服务专注于特定功能域。
┌─────────────────────────────────────────────┐
│ API Gateway │
└─────────────┬─────────────┬─────────────────┘│ │ ┌─────────▼───┐ ┌─────▼───────┐ ┌─────────────┐│ Resource │ │ Tool │ │ Identity ││ Service │ │ Service │ │ Service │└─────────────┘ └─────────────┘ └─────────────┘│ │ │┌─────────▼───┐ ┌─────▼───────┐ ┌─────▼───────┐│ Storage │ │ Execution │ │ Security ││ Service │ │ Service │ │ Service │└─────────────┘ └─────────────┘ └─────────────┘
// 微服务模式实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import axios from 'axios';
import { MongoClient } from 'mongodb';
import { v4 as uuidv4 } from 'uuid';
import amqp from 'amqplib';// 服务发现客户端
class ServiceDiscovery {private serviceRegistry: Map<string, ServiceInfo[]> = new Map();private consulUrl: string;constructor(consulUrl: string) {this.consulUrl = consulUrl;this.refreshRegistry();setInterval(() => this.refreshRegistry(), 30000);}// 刷新服务注册表async refreshRegistry() {try {const response = await axios.get(`${this.consulUrl}/v1/catalog/services`);for (const [serviceName, tags] of Object.entries(response.data)) {if (tags.includes('mcp')) {const instances = await this.getServiceInstances(serviceName);this.serviceRegistry.set(serviceName, instances);}}} catch (error) {console.error('刷新服务注册表失败:', error);}}// 获取服务实例列表async getServiceInstances(serviceName: string): Promise<ServiceInfo[]> {try {const response = await axios.get(`${this.consulUrl}/v1/health/service/${serviceName}?passing=true`);return response.data.map(entry => ({id: entry.Service.ID,address: entry.Service.Address,port: entry.Service.Port,tags: entry.Service.Tags,meta: entry.Service.Meta}));} catch (error) {console.error(`获取服务${serviceName}实例失败:`, error);return [];}}// 获取服务实例getService(serviceName: string, tags: string[] = []): ServiceInfo | null {const instances = this.serviceRegistry.get(serviceName) || [];// 筛选满足标签要求的实例const eligibleInstances = instances.filter(instance => tags.every(tag => instance.tags.includes(tag)));if (eligibleInstances.length === 0) return null;// 简单负载均衡:随机选择一个实例const randomIndex = Math.floor(Math.random() * eligibleInstances.length);return eligibleInstances[randomIndex];}
}// MCP资源服务
class ResourceMcpService {private server: McpServer;private app = express();private mongoClient: MongoClient;private serviceId: string;private discovery: ServiceDiscovery;private messageBroker: amqp.Connection;private channel: amqp.Channel;constructor(config) {this.server = new McpServer({name: "mcp-resource-service",description: "MCP资源微服务",version: "1.0.0"});this.serviceId = uuidv4();this.discovery = new ServiceDiscovery(config.consulUrl);// 初始化Express中间件this.initializeExpress();// 连接MongoDBthis.connectMongo(config.mongoUrl);// 连接消息代理this.connectMessageBroker(config.rabbitMqUrl);// 注册服务this.registerService(config.consulUrl);}// 初始化ExpressinitializeExpress() {this.app.use(express.json());// 资源API端点this.app.get('/resources/:name', this.getResource.bind(this));this.app.post('/resources/:name', this.createResource.bind(this));this.app.put('/resources/:name/:id', this.updateResource.bind(this));this.app.delete('/resources/:name/:id', this.deleteResource.bind(this));// 健康检查端点this.app.get('/health', (req, res) => {res.status(200).json({ status: 'healthy', serviceId: this.serviceId });});// 启动服务器const port = process.env.PORT || 3000;this.app.listen(port, () => {console.log(`资源服务运行在端口 ${port}`);});}// 连接MongoDBasync connectMongo(mongoUrl: string) {try {this.mongoClient = new MongoClient(mongoUrl);await this.mongoClient.connect();console.log('已连接到MongoDB');} catch (error) {console.error('MongoDB连接失败:', error);process.exit(1);}}// 连接消息代理async connectMessageBroker(rabbitMqUrl: string) {try {this.messageBroker = await amqp.connect(rabbitMqUrl);this.channel = await this.messageBroker.createChannel();// 声明交换机和队列await this.channel.assertExchange('mcp-events', 'topic', { durable: true });const { queue } = await this.channel.assertQueue(`resource-service-${this.serviceId}`,{ exclusive: true });// 绑定感兴趣的主题await this.channel.bindQueue(queue, 'mcp-events', 'resource.*');// 消费消息this.channel.consume(queue, (msg) => {if (msg) {this.handleMessage(msg);this.channel.ack(msg);}});console.log('已连接到消息代理');} catch (error) {console.error('消息代理连接失败:', error);}}// 处理消息handleMessage(msg: amqp.ConsumeMessage) {try {const content = JSON.parse(msg.content.toString());const routingKey = msg.fields.routingKey;console.log(`收到消息: ${routingKey}`, content);// 根据路由键处理不同类型的消息switch (routingKey) {case 'resource.created':this.handleResourceCreated(content);break;case 'resource.updated':this.handleResourceUpdated(content);break;case 'resource.deleted':this.handleResourceDeleted(content);break;}} catch (error) {console.error('处理消息错误:', error);}}// 发布事件publishEvent(routingKey: string, data: any) {this.channel.publish('mcp-events',routingKey,Buffer.from(JSON.stringify({...data,serviceId: this.serviceId,timestamp: Date.now()})));}// API处理器async getResource(req, res) {try {const { name } = req.params;const filter = req.query.filter ? JSON.parse(req.query.filter) : {};const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const resources = await collection.find(filter).toArray();res.json({ success: true, data: resources });} catch (error) {res.status(500).json({success: false,error: error.message});}}async createResource(req, res) {try {const { name } = req.params;const resourceData = req.body;const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const result = await collection.insertOne({...resourceData,_id: uuidv4(),createdAt: new Date(),updatedAt: new Date()});// 发布资源创建事件this.publishEvent('resource.created', {resourceName: name,resourceId: result.insertedId,data: resourceData});res.status(201).json({success: true,data: { id: result.insertedId }});} catch (error) {res.status(500).json({success: false,error: error.message});}}// 其他API处理器...// 注册服务到Consulasync registerService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/register`, {ID: this.serviceId,Name: 'mcp-resource-service',Tags: ['mcp', 'resource'],Address: process.env.SERVICE_HOST || 'localhost',Port: parseInt(process.env.PORT || '3000'),Check: {HTTP: `http://${process.env.SERVICE_HOST || 'localhost'}:${process.env.PORT || '3000'}/health`,Interval: '15s',Timeout: '5s'}});console.log(`服务已注册,ID: ${this.serviceId}`);// 注册服务注销钩子process.on('SIGINT', () => this.deregisterService(consulUrl));process.on('SIGTERM', () => this.deregisterService(consulUrl));} catch (error) {console.error('服务注册失败:', error);}}// 注销服务async deregisterService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/deregister/${this.serviceId}`);console.log('服务已注销');process.exit(0);} catch (error) {console.error('服务注销失败:', error);process.exit(1);}}
}
多服务器架构为MCP应用提供了更高的可用性、可伸缩性和灵活性,但同时也带来了系统复杂度的提升。在后续章节中,我们将深入探讨如何应对这些挑战,构建稳健的多服务器MCP系统。
2. 服务发现与路由
在多服务器MCP架构中,服务发现和路由机制是确保系统高效运行的关键组件。它们使客户端能够动态找到可用的服务实例,并将请求路由到最合适的服务器上。
2.1 服务注册与发现机制
服务发现的核心是让服务消费者能够在不需要硬编码服务提供者地址的情况下,动态找到并调用所需的服务。在MCP系统中,服务发现机制通常涉及以下几个关键组件:
2.1.1 服务注册表
服务注册表是服务发现的核心数据存储,记录了所有可用服务实例的关键信息:
// 服务注册表接口
interface ServiceRegistry {// 注册服务实例register(instance: ServiceInstance): Promise<void>;// 注销服务实例deregister(instanceId: string): Promise<void>;// 获取指定服务的所有实例getInstances(serviceName: string): Promise<ServiceInstance[]>;// 查询符合特定条件的服务实例findServices(query: ServiceQuery): Promise<ServiceInstance[]>;// 监听服务变更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void;
}// 服务实例数据结构
interface ServiceInstance {id: string; // 实例唯一标识serviceName: string; // 服务名称host: string; // 主机地址port: number; // 端口号status: 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE'; // 实例状态metadata: Record<string, string>; // 元数据,如版本、环境等healthCheckUrl?: string; // 健康检查URLregistrationTime: number; // 注册时间lastUpdateTime: number; // 最后更新时间
}
2.1.2 基于MCP实现的分布式服务注册表
以下是一个使用MCP服务器实现的分布式服务注册表示例:
import { McpServer } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';
import express from 'express';// 基于Redis的MCP服务注册表实现
class McpServiceRegistry implements ServiceRegistry {private redisClient;private readonly KEY_PREFIX = 'mcp:registry:';private readonly TTL = 60; // 服务记录过期时间(秒)constructor(redisUrl: string) {this.redisClient = createClient({ url: redisUrl });this.redisClient.connect();}// 注册服务实例async register(instance: ServiceInstance): Promise<void> {const key = `${this.KEY_PREFIX}${instance.serviceName}:${instance.id}`;const now = Date.now();// 更新注册时间或最后更新时间instance.lastUpdateTime = now;if (!instance.registrationTime) {instance.registrationTime = now;}// 将服务实例信息存储到Redisawait this.redisClient.set(key, JSON.stringify(instance), { EX: this.TTL });// 添加到服务名称集合中,方便按服务名查询await this.redisClient.sAdd(`${this.KEY_PREFIX}${instance.serviceName}`, instance.id);}// 注销服务实例async deregister(instanceId: string): Promise<void> {// 获取服务实例信息const pattern = `${this.KEY_PREFIX}*:${instanceId}`;const keys = await this.redisClient.keys(pattern);if (keys.length > 0) {const key = keys[0];const serviceName = key.split(':')[2];// 从Redis中删除服务实例await this.redisClient.del(key);// 从服务名称集合中移除await this.redisClient.sRem(`${this.KEY_PREFIX}${serviceName}`, instanceId);}}// 获取指定服务的所有实例async getInstances(serviceName: string): Promise<ServiceInstance[]> {const serviceKey = `${this.KEY_PREFIX}${serviceName}`;const instanceIds = await this.redisClient.sMembers(serviceKey);if (instanceIds.length === 0) {return [];}// 批量获取所有服务实例信息const keys = instanceIds.map(id => `${this.KEY_PREFIX}${serviceName}:${id}`);const instancesData = await this.redisClient.mGet(keys);// 过滤并解析有效的实例数据return instancesData.filter(Boolean).map(data => JSON.parse(data)).filter(instance => instance.status !== 'DOWN');}// 查询符合特定条件的服务实例async findServices(query: ServiceQuery): Promise<ServiceInstance[]> {const { serviceName, status, metadata } = query;// 获取所有匹配服务名的实例const instances = await this.getInstances(serviceName);// 应用过滤条件return instances.filter(instance => {// 状态过滤if (status && instance.status !== status) {return false;}// 元数据过滤if (metadata) {for (const [key, value] of Object.entries(metadata)) {if (instance.metadata[key] !== value) {return false;}}}return true;});}// 监听服务变更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void {// 创建订阅客户端const subClient = this.redisClient.duplicate();// 订阅服务变更事件subClient.subscribe(`${this.KEY_PREFIX}${serviceName}:changes`, (message) => {this.getInstances(serviceName).then(callback);});}
}// 服务注册表API服务器
function createRegistryServer(registry: McpServiceRegistry): express.Express {const app = express();app.use(express.json());// 服务注册端点app.post('/services', async (req, res) => {try {const instance = req.body as ServiceInstance;await registry.register(instance);res.status(201).json({ success: true, message: '服务实例已注册' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服务注销端点app.delete('/services/:instanceId', async (req, res) => {try {await registry.deregister(req.params.instanceId);res.status(200).json({ success: true, message: '服务实例已注销' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 获取服务实例端点app.get('/services/:serviceName', async (req, res) => {try {const instances = await registry.getInstances(req.params.serviceName);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服务查询端点app.get('/services', async (req, res) => {try {const query = {serviceName: req.query.serviceName as string,status: req.query.status as 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE',metadata: req.query.metadata ? JSON.parse(req.query.metadata as string) : undefined};const instances = await registry.findServices(query);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});return app;
}// 使用MCP服务器封装服务注册表
class McpRegistryServer extends McpServer {private registry: McpServiceRegistry;private httpServer: express.Express;constructor(config: {name: string;description: string;version: string;redisUrl: string;port: number;}) {super({name: config.name,description: config.description,version: config.version});// 创建服务注册表this.registry = new McpServiceRegistry(config.redisUrl);// 创建HTTP服务器this.httpServer = createRegistryServer(this.registry);// 启动HTTP服务器this.httpServer.listen(config.port, () => {console.log(`注册表服务器运行在端口 ${config.port}`);});// 作为MCP资源暴露服务注册表this.exposeRegistryAsResource();}// 将注册表作为MCP资源暴露private exposeRegistryAsResource() {// 定义服务实例资源模板this.registerResourceTemplate({name: "serviceInstances",description: "MCP服务实例资源",schema: {type: "object",properties: {id: { type: "string" },serviceName: { type: "string" },host: { type: "string" },port: { type: "number" },status: { type: "string", enum: ["UP", "DOWN", "STARTING", "OUT_OF_SERVICE"] },metadata: { type: "object" },healthCheckUrl: { type: "string" },registrationTime: { type: "number" },lastUpdateTime: { type: "number" }},required: ["id", "serviceName", "host",
相关文章:
【MCP Node.js SDK 全栈进阶指南】高级篇(1):MCP多服务器协作架构
随着业务规模的不断扩大和系统复杂度的提升,单一服务器架构往往无法满足高并发、高可用性和弹性扩展的需求。在MCP生态系统中,多服务器协作架构成为构建大规模应用的必然选择。本文将深入探讨MCP TypeScript-SDK在多服务器环境下的部署、协作和管理,以及如何构建高可用、高性…...
铭记之日(3)——4.28
铭记之日(3)——4.28 25.4.28,绝对是继20.12.19与24.6.26之后,又一个被钉在耻辱柱上的日子。 4.28本质上为12.19的严重恶劣版。 道德败坏、恶劣的大骗子终于在今日穿帮落马。 斯文面孔下,竟藏匿了如此罪恶幽暗混沌的内心。 24.10.20&…...
4月28日信息差全景:国际局势、科技突破与市场震荡一、国际政治与安全:俄乌冲突关键转折
一、国际政治与安全:俄乌冲突关键转折 1. 乌克兰反攻进展与情报差异 前线动态: 俄国防部称在顿涅茨克击退乌军三次进攻,摧毁12辆坦克;乌方则宣布在巴赫穆特南部推进2公里,双方战报存在显著差异。 信息差根源:战场信息管控导致西方媒体与俄媒报道截然不同。 国际援助: 美…...
Docker 容器虚拟化技术和自动化部署
Docker 容器虚拟化技术和自动化部署 一、Docker 核心组件1.1 Docker 引擎1.2 Docker 镜像1.3 Docker 容器1.4 Docker 仓库 二、Docker 环境安装清华镜像安装 三、Docker 基本操作3.1 镜像管理3.1.1 查看本地镜像 docker images3.1.2 添加镜像标签 docker tag3.1.3 查看镜像信息…...
人物5_My roommate
こんにちは Today, I will continue to share my life in JaPan. Everyone both know I couldn’t say JanPanese fluently【But I still learn this Language, I think it’s interesting for me{maybe it’s one exciting challenge, I want become a challenger that it li…...
OpenResty技术深度解析:原理、应用与生态对比-优雅草卓伊凡
OpenResty技术深度解析:原理、应用与生态对比-优雅草卓伊凡 一、OpenResty技术概述 1.1 OpenResty是什么? OpenResty是一个基于Nginx的高性能Web平台,它将标准的Nginx核心与一系列强大的第三方模块(主要是LuaJIT)捆绑在一起,形成了一个全功能的Web应用服务器。不同于传…...
深度学习任务评估指标
一、概念篇 混淆矩阵有何作用? 混淆矩阵(Confusion Matrix)是用于评估分类模型性能的工具,它展示了模型预测结果与实际标签之间的对比。混淆矩阵通常包括四个关键元素: True Positive (TP):模型正确预测为正类的数量。True Negative (TN):模型正确预测为负类的数量。F…...
Python-librosa库提取音频数据的MFCC特征
文章目录 MFCC特征代码分享 MFCC特征 MFCC(Mel-Frequency Cepstral Coefficients)是通过人耳对声音频率的感知方式对音频信号进行处理得到的特征,广泛用于语音识别和音频处理。 代码分享 import os import librosa import pywt import matpl…...
因特网和万维网
本文来源 :腾讯元宝 因特网(Internet)和万维网(World Wide Web,简称WWW)是紧密相关但完全不同的两个概念,它们的核心区别如下: 本质不同 因特网(Internet&#…...
道可云人工智能每日资讯|“人工智能科技体验展”在中国科学技术馆举行
道可云元宇宙每日简报(2025年4月28日)讯,今日元宇宙新鲜事有: 《2025年提升全民数字素养与技能工作要点》发布 近日,中央网信办、教育部、工业和信息化部、人力资源社会保障部联合印发《2025年提升全民数字素养与技能…...
Day8 鼠标控制与32位模式切换
文章目录 1. 例程harib05a(鼠标解读1)2. 例程harib05b(代码整理)3. 例程harib05c(鼠标解读2)4. 例程harib05d(移动鼠标指针)5. 通往32位模式之路 1. 例程harib05a(鼠标解…...
塔能科技:点亮节能之光,赋能工厂与城市
在能源形势日益严峻的当下,节能成为了各行各业的关键任务。工厂作为能耗大户,降低能耗迫在眉睫;市政照明作为城市运行的基本保障,也急需向绿色节能转型。塔能科技凭借其能源精准节能和定制开发的核心能力,为工厂节能和…...
UDP 报文结构与注意事项总结
目录 一、UDP报文结构简介 1. 源端口号(Source Port,16位): 2. 目的端口号(Destination Port,16位): 3. 长度(Length,16位): 4. 校…...
DBeaver CE 24.1.3 (Windows 64位) 详细安装教程
1. 下载安装包 dbeaver-ce-24.1.3-x86_64-setup.exe下载链接:https://pan.quark.cn/s/5a8dc9ad747f。 下载完成后,双击运行安装程序。 2. 运行安装向导 选择语言:安装程序启动后,选择安装语言(如英文或中文ÿ…...
Java多线程之线程控制
1、线程睡眠——sleep 如果我们需要让当前正在执行的线程暂停一段时间,并进入阻塞状态,则可以通过调用Thread的sleep方法 注意如下几点问题 ①、sleep是静态方法,最好不要用Thread的实例对象调用它,因为它睡眠的始终是当前正在运…...
任意波形发生器——2路同步DA模拟量输出卡
定义 任意波形发生器(Arbitrary Waveform Generator, AWG)是一种电子测试仪器,能够通过数字信号处理(DSP)和数模转换(DAC)技术生成非周期性、可编程的任意形状电信号。与传统函数发生器仅支持…...
【Java】 使用 HTTP 响应状态码定义web系统返回码
系统状态码定义 public interface GlobalErrorCodeConstants {ErrorCode SUCCESS new ErrorCode(0, "成功");// 客户端错误段 ErrorCode BAD_REQUEST new ErrorCode(400, "请求参数不正确");ErrorCode UNAUTHORIZED new ErrorCode(401, "账号未登…...
测试反馈陷入死循环?5大策略拆解新旧Bug难题
新旧Bug堆叠,测试反馈陷入死循环,如果不及时解决此问题,往往容易导致项目延期、成本增加、团队效率降低,直接影响产品的市场竞争力 。因此需及时应对此问题,进而保障项目进度如期进行,提升软件质量…...
结合大语言模型的机械臂抓取操作学习
结合大语言模型的机械臂抓取操作学习(完整ppt和代码)无视频 代码能正常运行时不负责答疑! 电子产品,一经出售,概不退换 算法设计、毕业设计、期刊专利!感兴趣可以联系我。 🏆代码获取方式1: 私信…...
待验证---Oracle 19c 在 CentOS 7 上的快速安装部署指南
Oracle 19c 在 CentOS 7 上的快速安装部署指南 Oracle Database 19c 是一个功能强大的企业级数据库系统,下面我将为您提供在 CentOS 7 上快速安装部署 Oracle 19c 的详细步骤。 一、准备工作 1. 系统要求 CentOS 7 (64位)最小内存: 2GB (推荐 8GB 以上)最小磁盘…...
风力发电领域canopen转Profinet网关的应用
在风力发电领域,开疆canopen转Profinet网关KJ-PNG-205的应用案例通常涉及将风力涡轮机内部的CANopen网络与外部的Profinet工业以太网连接起来。这种转换网关允许风力发电场的控制系统通过Profinet协议收集和监控涡轮机的状态信息,同时发送控制命令。 风力…...
vr全景相机如何选择?
VR全景相机,作为虚拟现实技术的核心设备之一,能够拍摄360度全景照片和视频,使用户通过虚拟现实设备身临其境地体验拍摄场景。 这种技术的快速发展,得益于传感器、图像处理和计算机视觉技术的不断进步。 选择一台合适的VR全景相机…...
在 Conda 中,包的安装路径在电脑的哪里
在 Conda 中,包的安装路径取决于你的 Conda 安装方式 和 环境类型(base 或其他虚拟环境)。以下是不同情况下的详细说明: 📌 1. Conda 包的默认安装路径 Conda 将所有包存储在 pkgs 目录 下,并在各个环境中…...
phpstorm用php连接数据库报错
项目场景: phpstorm用php连接数据库 问题描述 用php使用mysql_connect 的时候报错了,没有这个函数 原因分析: php解释器问题,后来查资料得知mysql_connct只适用于php5.5以下解释器。一开始用的7,改成5.3以后还是报…...
今日行情明日机会——20250428
指数依然在震荡区间,等待方向选择~ 2025年4月28日涨停主要行业方向分析 一、核心主线方向 一季报增长(业绩驱动资金避险) • 涨停家数:10家(最强方向)。 • 代表标的: ◦ 珀莱雅(2…...
Spring Boot 3与JDK 8环境下的单元测试实践指南
一、引言 在Java后端开发中,单元测试是保障代码质量的核心手段。Spring Boot作为主流框架,其单元测试体系随着版本迭代不断优化。本文聚焦于JDK 8与Spring Boot 3的组合场景,深入解析单元测试的多种实现方式,对比不同测试策略的异…...
微分与积分(前言)
导数 导数是一个非常重要的概念,先来看一个引例:速度问题。历史上速度问题与倒数概念的形成有着密切的关系。 平均速度 v s t v\frac{s}{t} vts那么如何表示瞬时速度呢? 瞬时经过路程: Δ s s ( t 0 Δ t ) − s ( t 0 ) Δ…...
61. Java 类和对象 - 使用 this 关键字
文章目录 61. Java 类和对象 - 使用 this 关键字1. 在方法或构造函数中引用对象成员1.1 区分同名变量1.2 在普通方法中引用字段或调用方法 2. 在构造函数中调用另一个构造函数示例:构造函数重载 3. 其他用法:返回当前对象4. 注意事项5. 总结 61. Java 类…...
安达发|高效智能塑料切割数控系统 - 全自动化软件解决方案
在当今的制造业中,塑料作为一种轻便、耐用且成本效益高的材料,被广泛应用于各个领域。随着科技的进步和市场需求的变化,塑料加工行业正面临着前所未有的挑战和机遇。为了提高生产效率,降低成本,并满足日益严格的质量标…...
20250428-AI Agent:智能体的演进与未来
目录 一、AI Agent的定义与演进 1.1 传统AI Agent的发展历程 1.2 现代AI Agent的技术突破 二、AI Agent的核心组件 2.1 大模型动态推理规划 2.2 工具系统的演进 2.3 记忆模块的设计 三、AI Agent的工作流程 3.1 感知阶段 3.2 推理阶段 3.3 决策阶段 3.4 执行阶段 …...
微信小程序分页和下拉刷新
在page.json中配置下拉刷新和底部距顶部的距离: {"path": "school-detail","style": {"navigationStyle": "custom","navigationBarTextStyle": "black","enablePullDownRefresh&quo…...
文献阅读(一)植物应对干旱的生理学反应 | The physiology of plant responses to drought
分享一篇Science上的综述文章,主要探讨了植物应对干旱的生理机制,强调通过调控激素信号提升植物耐旱性、保障粮食安全的重要性。 摘要 干旱每年致使农作物产量的损失,比所有病原体造成损失的总和还要多。为适应土壤中的湿度梯度变化&#x…...
开源CMS系统的SEO优化功能主要依赖哪些插件?
在当今互联网时代,搜索引擎优化(SEO)是网站获取流量的核心手段之一。开源内容管理系统(CMS)因其灵活性和丰富的插件生态,成为许多开发者和企业的首选。本文将以主流开源CMS为例,深入解析其SEO优…...
YUM/DNF管理工具
YUM (Yellow dog Updater, Modified) , RHEL8 中默认使用的软件批量管理工具由原版本的 yum 换成了速度更快的 dnf ( DNF Dandified YUM ),原有的 yum 命令仅为 dnf 的软链接,当然依旧可以使用。 [root…...
Deepseek 生成新玩法:从文本到可下载 Word 文档?思路与实践
大家好!最近有朋友问到,能不能用 Deepseek 这类强大的 AI 模型,根据一个包含具体格式要求(比如字体、字号、行距、对齐方式等)的提示词,直接生成一篇论文,并且输出一个能直接下载的 Word (.docx…...
【OSG学习笔记】Day 13: 事件处理——响应键盘与鼠标
在OpenSceneGraph (OSG) 中,事件处理是实现用户交互功能的重要部分。 通过自定义按键事件(如 WASD 键控制模型移动),可以让用户与场景进行互动。 osgGA::GUIEventHandler osgGA::GUIEventHandler 是 OpenSceneGraph (OSG) 中用…...
element-ui carousel 组件源码分享
carousel 走马灯源码简单分享,主要从以下几个方面: 1、carousel 组件页面结构。 2、carousel 组件属性。 3、carousel 组件事件。 4、carousel 组件方法。 5、carousel-item 组件属性。 一、组件页面结构。 二、组件属性。 2.1 height 走马灯的高…...
在视图中交互 闪退问题
程序闪退 //void mouseEventOccurred(const pcl::visualization::MouseEvent &event, // void* viewer_void) //{ // boost::shared_ptr<pcl::visualization::PCLVisualizer> viewer *static_cast<boost::shared_ptr<pcl::visualization::PCLVisualizer> …...
python 线程池顺序执行
在Python中,线程池(ThreadPoolExecutor)默认是并发执行任务的,但若需要实现任务的顺序执行(按提交顺序执行或按结果顺序处理),可以通过以下方案实现: 方案一:强制单线程&…...
DeepSeek智能时空数据分析(六):大模型NL2SQL绘制城市之间连线
序言:时空数据分析很有用,但是GIS/时空数据库技术门槛太高 时空数据分析在优化业务运营中至关重要,然而,三大挑战仍制约其发展:技术门槛高,需融合GIS理论、SQL开发与时空数据库等多领域知识;空…...
[250428] Nginx 1.28.0 发布:性能优化、安全增强及新特性
目录 Nginx 1.28.0 稳定版发布主要亮点包括:功能增强:安全性改进:其他: Nginx 1.28.0 稳定版发布 Nginx 官方于 4 月 24 日发布了最新的 1.28.0 稳定版本。此版本基于之前的 1.27.x 主线分支,整合了多项新功能、性能优…...
第二章:Agent System
Chapter 2: Agent System 从用户界面到代理系统:背后的“大脑”如何运作? 在上一章的用户界面抽象中,我们已经能通过命令行与AI简单对话了。但你有没有好奇过:输入的问题是如何变成“北京今天晴,气温25C”这样的回答的…...
自动驾驶L4级技术落地:特斯拉、Waymo与华为的路线之争
自动驾驶L4级技术落地:特斯拉、Waymo与华为的路线之争 摘要 随着自动驾驶技术进入L4级(高度自动化驾驶)商业化探索的关键阶段,全球头部企业围绕技术路线与商业模式展开激烈竞争。特斯拉、Waymo与华为分别代表视觉优先、全栈自研…...
K8s新手系列之K8s中的资源
K8s中资源的概念 在kubernetes中,所有的内容都抽象为资源,用户需要通过操作资源来管理kubernetes。 kubernetes的本质上就是一个集群系统,用户可以在集群中部署各种服务,所谓的部署服务,其实就是在kubernetes集群中运行…...
万亿参数大模型网络瓶颈突破:突破90%网络利用率的技术实践
AI数据中心网络热潮下,如何突破传统以太网利用率瓶颈? 近年来,随着AI大模型训练(如GPT-4、Gemini)的爆发式增长,数据中心网络的流量压力急剧上升。单次训练任务可能涉及数千张GPU卡协同工作,生成…...
【KWDB 创作者计划】_企业数据管理的利刃:技术剖析与应用实践
【KWDB 创作者计划】_企业数据管理的利刃:技术剖析与应用实践 引言 作为一名在企业级开发领域摸爬滚打多年的开发者,见证了数据库技术的不断迭代与革新,众多数据库产品中,KWDB 以其独特的技术架构和卓越性能吸引了我的目光。本文将…...
vue复习91~135
1.空仓库 vuex的空仓库,写在store>index.js里面 语法:new Vuex.store 最后在main.js中导入挂载 import Vue from vue import Vuex from vuex //插件安装 Vue.use(Vuex) //创建仓库 export default new Vuex.Store() //导出main.js使用 export defau…...
正常流布局
布局决定了元素的排列方式。如果让浏览器按照默认方式排列,这叫做正常流(normal flow)布局。正常布局是怎么排列元素的呢?各行从上到下,行内从左到右。 那么什么情况下会开始新的一行呢?块元素会产生新行。…...
图论---拓扑排序(DFS)
时间复杂度: 最坏情况下为O(V!),其中V是顶点数 实际运行时间取决于图的拓扑结构 这个实现可以输出有向无环图的所有可能的拓扑排序,并能检测图中是否存在环。 算法思想: 使用回溯法枚举所有可能的拓扑排序 在每一步选择当前入…...
探索 Redis 缓存对系统性能的提升——项目启动与操作指南
探索 Redis 缓存对系统性能的提升——项目启动与操作指南 一、项目简介 Redis是一款高性能的键值存储数据库,以其出色的读写速度和丰富的数据结构著称,被广泛用作应用系统的缓存层。作为缓存,Redis通过将热点数据存储在内存中,显…...