企业级分布式 MCP 方案
飞书原文档链接地址:https://ik3te1knhq.feishu.cn/wiki/D8kSwC9tFi61CMkRdd8cMxNTnpg
企业级分布式 MCP 方案
[!TIP]
背景:现阶段 MCP Client 和 MCP Server 是一对一的连接方式,若当前 MCP Server 挂掉了,那么 MCP Client 便不能使用 MCP Server 提供的工具能力。工具稳定性的提供得不到保证
解决:做了一些分布式 Client 连接的探索,一个 MCP Client 端可以连接多个 MCP Server(分布式部署),目前采用的方案如下:
- 新建一个包含服务名和对应连接的类
- 另外实现监听机制,可以动态的应对 MCP Server 节点上下线,去动态调整 mcpAsyncClientList
- (读操作)获取 MCP Server 相关信息的,采用从 mcpAsyncClientList 列表中随机中获取一个去发起请求,比如获取工具列表信息
- (写操作)对应 MCP Server 需要更改的信息,由 MCP Client 端发起,需要修改所有的 MCP Server
public class LoadbalancedAsyncClient implements EventListener {
private String serviceName;
private List mcpAsyncClientList;
}
给社区贡献代码:https://github.com/alibaba/spring-ai-alibaba/pull/755
模块代码解析
yaml 文件
spring:ai:mcp:client:enabled: truename: mcp-client-webfluxversion: 1.0.0type: SYNCnacos-enabled: true # 开启nacos-client配置,启动分布式alibaba:mcp:nacos: ## nacos的基础配置信息enabled: trueserver-addr: <nacos-sever-addr>service-namespace: <nacos-namespace> service-group: <nacos-group>client:sse:connections: // 注册在nacos的MCP Server服务,这里mcp-server1代表多节点nacos-server1: mcp-server1nacos-server2: mcp-server2
自动注入部分
NacosMcpSseClientProperties(配置类)
@ConfigurationProperties("spring.ai.alibaba.mcp.client.sse")
public class NacosMcpSseClientProperties {public static final String _CONFIG_PREFIX _= "spring.ai.alibaba.mcp.client.sse";private final Map<String, String> connections = new HashMap<>();public Map<String, String> getConnections() {return connections;}}
NacosMcpSseClientAutoConfiguration
提供 Map<String, List> 的 bean
- 键代表服务名
- 值为对应的后续连接的 WebFluxSseClientTransport 列表
@AutoConfiguration
@EnableConfigurationProperties({ NacosMcpSseClientProperties.class, NacosMcpRegistryProperties.class })
public class NacosMcpSseClientAutoConfiguration {private static final Logger _logger _= LoggerFactory._getLogger_(NacosMcpSseClientAutoConfiguration.class);public NacosMcpSseClientAutoConfiguration() {}@Beanpublic NamingService nacosNamingService(NacosMcpRegistryProperties nacosMcpRegistryProperties) {Properties nacosProperties = nacosMcpRegistryProperties.getNacosProperties();try {return NamingFactory._createNamingService_(nacosProperties);}catch (NacosException e) {throw new RuntimeException(e);}}@Bean(name = "server2NamedTransport")public Map<String, List<NamedClientMcpTransport>> server2NamedTransport(NacosMcpSseClientProperties nacosMcpSseClientProperties, NamingService namingService,ObjectProvider<WebClient.Builder> webClientBuilderProvider,ObjectProvider<ObjectMapper> objectMapperProvider) {Map<String, List<NamedClientMcpTransport>> server2NamedTransport = new HashMap<>();WebClient.Builder webClientBuilderTemplate = (WebClient.Builder) webClientBuilderProvider.getIfAvailable(WebClient::_builder_);ObjectMapper objectMapper = (ObjectMapper) objectMapperProvider.getIfAvailable(ObjectMapper::new);Map<String, String> connections = nacosMcpSseClientProperties.getConnections();connections.forEach((serviceKey, serviceName) -> {try {List<Instance> instances = namingService.selectInstances(serviceName, true);List<NamedClientMcpTransport> namedTransports = new ArrayList<>();for (Instance instance : instances) {String url = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"+ instance.getPort();WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(url);WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);namedTransports.add(new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport));}server2NamedTransport.put(serviceName, namedTransports);}catch (NacosException e) {_logger_.error("nacos naming service: {} error", serviceName, e);}});return server2NamedTransport;}}
NacosMcpClientAutoConfiguration
提供和 MCP Server 进行交互的客户端
- List
- List
@AutoConfiguration(after = { NacosMcpSseClientAutoConfiguration.class, McpClientAutoConfiguration.class })
@ConditionalOnClass({ McpSchema.class })
@EnableConfigurationProperties({ McpClientCommonProperties.class })
@ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "nacos-enabled" }, havingValue = "true",matchIfMissing = false)
public class NacosMcpClientAutoConfiguration {public NacosMcpClientAutoConfiguration() {}private String connectedClientName(String clientName, String serverConnectionName) {return clientName + " - " + serverConnectionName;}@Bean@ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC",matchIfMissing = true)public List<LoadbalancedMcpSyncClient> loadbalancedMcpSyncClientList(ObjectProvider<McpSyncClientConfigurer> mcpSyncClientConfigurerProvider,McpClientCommonProperties commonProperties,@Qualifier("server2NamedTransport") ObjectProvider<Map<String, List<NamedClientMcpTransport>>> server2NamedTransportProvider,ObjectProvider<NamingService> namingServiceProvider) {NamingService namingService = namingServiceProvider.getObject();McpSyncClientConfigurer mcpSyncClientConfigurer = mcpSyncClientConfigurerProvider.getObject();List<LoadbalancedMcpSyncClient> loadbalancedMcpSyncClients = new ArrayList<>();Map<String, List<NamedClientMcpTransport>> server2NamedTransport = server2NamedTransportProvider.getObject();for (Map.Entry<String, List<NamedClientMcpTransport>> entry : server2NamedTransport.entrySet()) {String serviceName = entry.getKey();List<NamedClientMcpTransport> namedTransports = entry.getValue();List<McpSyncClient> mcpSyncClients = new ArrayList<>();McpSyncClient syncClient;for (NamedClientMcpTransport namedTransport : namedTransports) {McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec);syncClient = syncSpec.build();if (commonProperties.isInitialized()) {syncClient.initialize();}mcpSyncClients.add(syncClient);}LoadbalancedMcpSyncClient loadbalancedMcpSyncClient = LoadbalancedMcpSyncClient._builder_().serviceName(serviceName).mcpSyncClientList(mcpSyncClients).namingService(namingService).build();loadbalancedMcpSyncClient.subscribe();loadbalancedMcpSyncClients.add(loadbalancedMcpSyncClient);}return loadbalancedMcpSyncClients;}@Bean@ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC")public List<LoadbalancedMcpAsyncClient> loadbalancedMcpAsyncClientList(ObjectProvider<McpAsyncClientConfigurer> mcpAsyncClientConfigurerProvider,McpClientCommonProperties commonProperties,@Qualifier("server2NamedTransport") ObjectProvider<Map<String, List<NamedClientMcpTransport>>> server2NamedTransportProvider,ObjectProvider<NamingService> namingServiceProvider) {NamingService namingService = namingServiceProvider.getObject();McpAsyncClientConfigurer mcpAsyncClientConfigurer = mcpAsyncClientConfigurerProvider.getObject();List<LoadbalancedMcpAsyncClient> loadbalancedMcpAsyncClients = new ArrayList<>();Map<String, List<NamedClientMcpTransport>> server2NamedTransport = server2NamedTransportProvider.getObject();for (Map.Entry<String, List<NamedClientMcpTransport>> entry : server2NamedTransport.entrySet()) {String serviceName = entry.getKey();List<NamedClientMcpTransport> namedTransports = entry.getValue();List<McpAsyncClient> mcpAsyncClients = new ArrayList<>();McpAsyncClient asyncClient;for (NamedClientMcpTransport namedTransport : namedTransports) {McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());asyncSpec = mcpAsyncClientConfigurer.configure(namedTransport.name(), asyncSpec);asyncClient = asyncSpec.build();if (commonProperties.isInitialized()) {asyncClient.initialize().block();}mcpAsyncClients.add(asyncClient);}LoadbalancedMcpAsyncClient loadbalancedMcpAsyncClient = LoadbalancedMcpAsyncClient._builder_().serviceName(serviceName).mcpAsyncClientList(mcpAsyncClients).namingService(namingService).build();loadbalancedMcpAsyncClient.subscribe();loadbalancedMcpAsyncClients.add(loadbalancedMcpAsyncClient);}return loadbalancedMcpAsyncClients;}}
Client 端部分
LoadbalancedMcpAsyncClient
各字段含义:
String serviceName
:MCP Server 注册的服务名称List<McpAsyncClient> mcpAsyncClientList
:对应的多节点客户端NamingService namingService
:Nacos 服务List<Instance> instances
:Nacos 中 MCP Server 的实例列表
其余方法的使用和 McpAsyncClient
保持一致,已经全面封装好了
- 读操作:通过 getMcpAsyncClient()方法轮询得到
McpAsyncClient
列表 - 写操作:对所有
List<McpAsyncClient>
进行操作
通过实现 EventListener 接口,动态增加 or 减少 McpAsyncClient
public class LoadbalancedMcpAsyncClient implements EventListener {private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class);private final String serviceName;private final List<McpAsyncClient> mcpAsyncClientList;private final AtomicInteger currentIndex = new AtomicInteger(0);private final NamingService namingService;private List<Instance> instances;public LoadbalancedMcpAsyncClient(String serviceName, List<McpAsyncClient> mcpAsyncClientList,NamingService namingService) {Assert._notNull_(serviceName, "serviceName cannot be null");Assert._notNull_(mcpAsyncClientList, "mcpAsyncClientList cannot be null");Assert._notNull_(namingService, "namingService cannot be null");this.serviceName = serviceName;this.mcpAsyncClientList = mcpAsyncClientList;try {this.namingService = namingService;this.instances = namingService.selectInstances(serviceName, true);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName));}}public void subscribe() {try {this.namingService.subscribe(this.serviceName, this);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName));}}public String getServiceName() {return serviceName;}public List<McpAsyncClient> getMcpAsyncClientList() {return mcpAsyncClientList;}public NamingService getNamingService() {return this.namingService;}public List<Instance> getInstances() {return this.instances;}private McpAsyncClient getMcpAsyncClient() {if (mcpAsyncClientList.isEmpty()) {throw new IllegalStateException("No McpAsyncClient available");}int index = currentIndex.getAndIncrement() % mcpAsyncClientList.size();return mcpAsyncClientList.get(index);}// ------------------------------------------------------------------------------------------------------------------------------------------------public McpSchema.ServerCapabilities getServerCapabilities() {return getMcpAsyncClient().getServerCapabilities();}public McpSchema.Implementation getServerInfo() {return getMcpAsyncClient().getServerInfo();}public boolean isInitialized() {return getMcpAsyncClient().isInitialized();}public McpSchema.ClientCapabilities getClientCapabilities() {return getMcpAsyncClient().getClientCapabilities();}public McpSchema.Implementation getClientInfo() {return getMcpAsyncClient().getClientInfo();}public void close() {Iterator<McpAsyncClient> iterator = mcpAsyncClientList.iterator();while (iterator.hasNext()) {McpAsyncClient mcpAsyncClient = iterator.next();mcpAsyncClient.close();iterator.remove();_logger_.info("Closed and removed McpSyncClient: {}", mcpAsyncClient.getClientInfo().name());}}public Mono<Void> closeGracefully() {Iterator<McpAsyncClient> iterator = mcpAsyncClientList.iterator();List<Mono<Void>> closeMonos = new ArrayList<>();while (iterator.hasNext()) {McpAsyncClient mcpAsyncClient = iterator.next();Mono<Void> voidMono = mcpAsyncClient.closeGracefully().doOnSuccess(v -> {iterator.remove();_logger_.info("Closed and removed McpAsyncClient: {}", mcpAsyncClient.getClientInfo().name());});closeMonos.add(voidMono);}return Mono._when_(closeMonos);}public Mono<Object> ping() {return getMcpAsyncClient().ping();}public Mono<Void> addRoot(McpSchema.Root root) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.addRoot(root)).collect(Collectors._toList_()));}public Mono<Void> removeRoot(String rootUri) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.removeRoot(rootUri)).collect(Collectors._toList_()));}public Mono<Void> rootsListChangedNotification() {return Mono._when_(mcpAsyncClientList.stream().map(McpAsyncClient::rootsListChangedNotification).collect(Collectors._toList_()));}public Mono<McpSchema.CallToolResult> callTool(McpSchema.CallToolRequest callToolRequest) {return getMcpAsyncClient().callTool(callToolRequest);}public Mono<McpSchema.ListToolsResult> listTools() {return getMcpAsyncClient().listTools();}public Mono<McpSchema.ListToolsResult> listTools(String cursor) {return getMcpAsyncClient().listTools(cursor);}public Mono<McpSchema.ListResourcesResult> listResources() {return getMcpAsyncClient().listResources();}public Mono<McpSchema.ListResourcesResult> listResources(String cursor) {return getMcpAsyncClient().listResources(cursor);}public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.Resource resource) {return getMcpAsyncClient().readResource(resource);}public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.ReadResourceRequest readResourceRequest) {return getMcpAsyncClient().readResource(readResourceRequest);}public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates() {return getMcpAsyncClient().listResourceTemplates();}public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String cursor) {return getMcpAsyncClient().listResourceTemplates(cursor);}public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.subscribeResource(subscribeRequest)).collect(Collectors._toList_()));}public Mono<Void> unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.unsubscribeResource(unsubscribeRequest)).collect(Collectors._toList_()));}public Mono<McpSchema.ListPromptsResult> listPrompts() {return getMcpAsyncClient().listPrompts();}public Mono<McpSchema.ListPromptsResult> listPrompts(String cursor) {return getMcpAsyncClient().listPrompts(cursor);}public Mono<McpSchema.GetPromptResult> getPrompt(McpSchema.GetPromptRequest getPromptRequest) {return getMcpAsyncClient().getPrompt(getPromptRequest);}public Mono<Void> setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {return Mono._when_(mcpAsyncClientList.stream().map(mcpAsyncClient -> mcpAsyncClient.setLoggingLevel(loggingLevel)).collect(Collectors._toList_()));}// ------------------------------------------------------------------------------------------------------------------------------------------------@Overridepublic void onEvent(Event event) {if (event instanceof NamingEvent namingEvent) {if (this.serviceName.equals(namingEvent.getServiceName())) {_logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName());List<Instance> instances = namingEvent.getInstances();_logger_.info("Updated instances count: {}", instances.size());// 打印每个实例的详细信息instances.forEach(instance -> {_logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(),instance.getPort(), instance.isHealthy(), instance.isEnabled(),JacksonUtils._toJson_(instance.getMetadata()));});updateClientList(instances);}}}private void updateClientList(List<Instance> currentInstances) {McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class);McpAsyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder._getBean_(McpAsyncClientConfigurer.class);ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class);WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class);// 移除的实例列表List<Instance> removeInstances = instances.stream().filter(instance -> !currentInstances.contains(instance)).collect(Collectors._toList_());// 新增的实例列表List<Instance> addInstances = currentInstances.stream().filter(instance -> !instances.contains(instance)).collect(Collectors._toList_());// 删除McpAsyncClient实例List<String> clientInfoNames = removeInstances.stream().map(instance -> connectedClientName(commonProperties.getName(),this.serviceName + "-" + instance.getInstanceId())).toList();Iterator<McpAsyncClient> iterator = mcpAsyncClientList.iterator();while (iterator.hasNext()) {McpAsyncClient mcpAsyncClient = iterator.next();McpSchema.Implementation clientInfo = mcpAsyncClient.getClientInfo();if (clientInfoNames.contains(clientInfo.name())) {_logger_.info("Removing McpAsyncClient: {}", clientInfo.name());mcpAsyncClient.closeGracefully().subscribe(v -> {iterator.remove();}, e -> _logger_.error("Failed to remove McpAsyncClient: {}", clientInfo.name(), e));}}// 新增McpAsyncClient实例McpAsyncClient asyncClient;for (Instance instance : addInstances) {String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"+ instance.getPort();WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl);WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport);McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());asyncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), asyncSpec);asyncClient = asyncSpec.build();if (commonProperties.isInitialized()) {asyncClient.initialize().block();}_logger_.info("Added McpAsyncClient: {}", clientInfo.name());mcpAsyncClientList.add(asyncClient);}private String connectedClientName(String clientName, String serverConnectionName) {return clientName + " - " + serverConnectionName;}public static Builder builder() {return new Builder();}public static class Builder {private String serviceName;private List<McpAsyncClient> mcpAsyncClientList;private NamingService namingService;public Builder serviceName(String serviceName) {this.serviceName = serviceName;return this;}public Builder mcpAsyncClientList(List<McpAsyncClient> mcpAsyncClientList) {this.mcpAsyncClientList = mcpAsyncClientList;return this;}public Builder namingService(NamingService namingService) {this.namingService = namingService;return this;}public LoadbalancedMcpAsyncClient build() {return new LoadbalancedMcpAsyncClient(this.serviceName, this.mcpAsyncClientList, this.namingService);}}}
LoadbalancedMcpSyncClient
同上
public class LoadbalancedMcpSyncClient implements EventListener {private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class);private final String serviceName;private final List<McpSyncClient> mcpSyncClientList;private final AtomicInteger currentIndex = new AtomicInteger(0);private final NamingService namingService;private List<Instance> instances;public LoadbalancedMcpSyncClient(String serviceName, List<McpSyncClient> mcpSyncClientList,NamingService namingService) {Assert._notNull_(serviceName, "Service name must not be null");Assert._notNull_(mcpSyncClientList, "McpSyncClient list must not be null");Assert._notNull_(namingService, "NamingService must not be null");this.serviceName = serviceName;this.mcpSyncClientList = mcpSyncClientList;try {this.namingService = namingService;this.instances = namingService.selectInstances(serviceName, true);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName));}}public void subscribe() {try {this.namingService.subscribe(this.serviceName, this);}catch (NacosException e) {throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName));}}public String getServiceName() {return this.serviceName;}public List<McpSyncClient> getMcpSyncClientList() {return this.mcpSyncClientList;}public NamingService getNamingService() {return this.namingService;}public List<Instance> getInstances() {return this.instances;}private McpSyncClient getMcpSyncClient() {if (mcpSyncClientList.isEmpty()) {throw new IllegalStateException("No McpAsyncClient available");}int index = currentIndex.getAndIncrement() % mcpSyncClientList.size();return mcpSyncClientList.get(index);}// ------------------------------------------------------------------------------------------------------------------------------------------------public McpSchema.ServerCapabilities getServerCapabilities() {return getMcpSyncClient().getServerCapabilities();}public McpSchema.Implementation getServerInfo() {return getMcpSyncClient().getServerInfo();}public McpSchema.ClientCapabilities getClientCapabilities() {return getMcpSyncClient().getClientCapabilities();}public McpSchema.Implementation getClientInfo() {return getMcpSyncClient().getClientInfo();}public void close() {Iterator<McpSyncClient> iterator = mcpSyncClientList.iterator();while (iterator.hasNext()) {McpSyncClient mcpSyncClient = iterator.next();mcpSyncClient.close();iterator.remove();_logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name());}}public boolean closeGracefully() {List<Boolean> flagList = new ArrayList<>();Iterator<McpSyncClient> iterator = mcpSyncClientList.iterator();while (iterator.hasNext()) {McpSyncClient mcpSyncClient = iterator.next();boolean flag = mcpSyncClient.closeGracefully();flagList.add(flag);if (flag) {iterator.remove();_logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name());}}return !flagList.stream().allMatch(flag -> flag);}public Object ping() {return getMcpSyncClient().ping();}public void addRoot(McpSchema.Root root) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.addRoot(root);}}public void removeRoot(String rootUri) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.removeRoot(rootUri);}}public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {return getMcpSyncClient().callTool(callToolRequest);}public McpSchema.ListToolsResult listTools() {return getMcpSyncClient().listTools();}public McpSchema.ListToolsResult listTools(String cursor) {return getMcpSyncClient().listTools(cursor);}public McpSchema.ListResourcesResult listResources(String cursor) {return getMcpSyncClient().listResources(cursor);}public McpSchema.ListResourcesResult listResources() {return getMcpSyncClient().listResources();}public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {return getMcpSyncClient().readResource(resource);}public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {return getMcpSyncClient().readResource(readResourceRequest);}public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {return getMcpSyncClient().listResourceTemplates(cursor);}public McpSchema.ListResourceTemplatesResult listResourceTemplates() {return getMcpSyncClient().listResourceTemplates();}public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.subscribeResource(subscribeRequest);}}public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.unsubscribeResource(unsubscribeRequest);}}public McpSchema.ListPromptsResult listPrompts(String cursor) {return getMcpSyncClient().listPrompts(cursor);}public McpSchema.ListPromptsResult listPrompts() {return getMcpSyncClient().listPrompts();}public McpSchema.GetPromptResult getPrompt(McpSchema.GetPromptRequest getPromptRequest) {return getMcpSyncClient().getPrompt(getPromptRequest);}public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {for (McpSyncClient mcpSyncClient : mcpSyncClientList) {mcpSyncClient.setLoggingLevel(loggingLevel);}}// ------------------------------------------------------------------------------------------------------------------------------------------------@Overridepublic void onEvent(Event event) {if (event instanceof NamingEvent namingEvent) {if (this.serviceName.equals(namingEvent.getServiceName())) {_logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName());List<Instance> instances = namingEvent.getInstances();_logger_.info("Updated instances count: {}", instances.size());// 打印每个实例的详细信息instances.forEach(instance -> {_logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(),instance.getPort(), instance.isHealthy(), instance.isEnabled(),JacksonUtils._toJson_(instance.getMetadata()));});updateClientList(instances);}}}private void updateClientList(List<Instance> currentInstances) {McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class);McpSyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder._getBean_(McpSyncClientConfigurer.class);ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class);WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class);// 移除的实例列表List<Instance> removeInstances = instances.stream().filter(instance -> !currentInstances.contains(instance)).collect(Collectors._toList_());// 新增的实例列表List<Instance> addInstances = currentInstances.stream().filter(instance -> !instances.contains(instance)).collect(Collectors._toList_());// 删除McpSyncClient实例List<String> clientInfoNames = removeInstances.stream().map(instance -> connectedClientName(commonProperties.getName(),this.serviceName + "-" + instance.getInstanceId())).toList();Iterator<McpSyncClient> iterator = mcpSyncClientList.iterator();while (iterator.hasNext()) {McpSyncClient mcpSyncClient = iterator.next();McpSchema.Implementation clientInfo = mcpSyncClient.getClientInfo();if (clientInfoNames.contains(clientInfo.name())) {_logger_.info("Removing McpsyncClient: {}", clientInfo.name());if (mcpSyncClient.closeGracefully()) {iterator.remove();}else {_logger_.warn("Failed to remove mcpSyncClient: {}", clientInfo.name());}}}// 新增McpSyncClient实例McpSyncClient syncClient;for (Instance instance : addInstances) {String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"+ instance.getPort();WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl);WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport);McpSchema.Implementation clientInfo = new McpSchema.Implementation(this.connectedClientName(commonProperties.getName(), namedTransport.name()),commonProperties.getVersion());McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport()).clientInfo(clientInfo).requestTimeout(commonProperties.getRequestTimeout());syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec);syncClient = syncSpec.build();if (commonProperties.isInitialized()) {syncClient.initialize();}_logger_.info("Added McpAsyncClient: {}", clientInfo.name());mcpSyncClientList.add(syncClient);}this.instances = currentInstances;}private String connectedClientName(String clientName, String serverConnectionName) {return clientName + " - " + serverConnectionName;}public static Builder builder() {return new Builder();}public static class Builder {private String serviceName;private List<McpSyncClient> mcpSyncClientList;private NamingService namingService;public Builder serviceName(String serviceName) {this.serviceName = serviceName;return this;}public Builder mcpSyncClientList(List<McpSyncClient> mcpSyncClientList) {this.mcpSyncClientList = mcpSyncClientList;return this;}public Builder namingService(NamingService namingService) {this.namingService = namingService;return this;}public LoadbalancedMcpSyncClient build() {return new LoadbalancedMcpSyncClient(this.serviceName, this.mcpSyncClientList, this.namingService);}}}
工具类
ApplicationContextHolder
@Component
public class ApplicationContextHolder implements ApplicationContextAware {private static ApplicationContext _applicationContext_;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {ApplicationContextHolder._applicationContext _= applicationContext;}public static <T> T getBean(Class<T> clazz) {return _applicationContext_.getBean(clazz);}}
效果演示
我在 nacos 中,注册了 MCP Server 服务,部署两个节点
- 同一台机器以不同端口号启动的 MCP Server 服务,分别是 19000、19001,注册在 Nacos 中以 mcp-server-provider 为服务名
yml 配置如下
server:port: 8080spring:application:name: mcp-client-webflux_ _ai:alibaba:mcp:nacos:enabled: trueserver-addr: 127.0.0.1:8848username: nacospassword: nacosclient:sse:connections:nacos-server1: mcp-server-providermcp:client:enabled: truename: mcp-client-webfluxversion: 0.0.1initialized: truerequest-timeout: 600snacos-enabled: true
我们能发现已经成功注入 LoadbalancedMcpSyncClient 类,其中 mcp-server-provider 有两个实例,对应的两个 McpSyncClient
我们停掉其中的 MCP Server19001 端口的服务,通过 removeInstances 获取移除的实例列表,同步在 mcpSyncClientList 移除对应的 McpSyncClient
我们再新启动 MCP Server19001 端口的服务,通过 addInstances 获取新增的实例列表,同步在 mcpSyncClientList 新增对应的 McpSyncClient
相关文章:
企业级分布式 MCP 方案
飞书原文档链接地址:https://ik3te1knhq.feishu.cn/wiki/D8kSwC9tFi61CMkRdd8cMxNTnpg 企业级分布式 MCP 方案 [!TIP] 背景:现阶段 MCP Client 和 MCP Server 是一对一的连接方式,若当前 MCP Server 挂掉了,那么 MCP Client 便不…...
玩转Docker(一):基本概念
容器技术是继大数据和云计算之后又一炙手可热的技术,而且未来相当一段时间内都会非常流行。 本文将对其基本概念和基本使用做出介绍。包括容器生态系统、容器的原理、怎样运行第一个容器、容器技术的概念与实践、Docker镜像等等 目录 一. 鸟瞰容器生态系统 1. 容器…...
Linux系统安装方式+适合初学者的发行版本
Linux系统安装方式适合初学者发行版—目录 一、Linux系统的安装方式1. 物理机直接安装2. 虚拟机安装3. 双系统安装4. Live USB试用5. 云服务器安装 二、适合初学者的Linux发行版1. Ubuntu2. Linux Mint3. Zorin OS4. Pop!_OS5. Elementary OS6. Fedora7. Manjaro 三、选择建议场…...
开启 Spring AI 之旅:从入门到实战
开启 Spring AI 之旅:从入门到实战 引言 在当今人工智能飞速发展的时代,Spring AI 为开发者们提供了一个强大而便捷的工具,用于在 Spring 生态系统中构建 AI 应用程序。本文将为你提供如何开始使用 Spring AI 的详细指南,帮助你…...
python数据分析(七):Pandas 数据变形与重塑
Pandas 数据变形与重塑全面指南 1. 引言 在数据分析过程中,我们经常需要将数据从一种结构转换为另一种结构,以适应不同的分析需求。Pandas 提供了丰富的数据变形与重塑功能,包括旋转(pivot)、堆叠(stack)、融合(melt)等多种操作。本文将详细…...
SX24C01.UG-PXI程控电阻桥板卡
PXI程控电阻桥板卡 概述 简介 程控电阻桥板卡采用4 个可程控精密调节的电阻臂组成桥式电路,通过计算机PXI总线控制继电器通断进行电阻调节;可根据具体情况,方便地选择不同桥路的连接;程控电阻桥板卡支持“1/4 桥”、“半桥”和…...
Python项目源码69:一键解析+csv保存通达信日线数据3.0
Python项目源码39:学生积分管理系统1.0(命令行界面Json) Python项目源码38:模拟炒股系统2.0(tkinterJson) Python项目源码35:音乐播放器2.0(Tkintermutagen) Python项…...
Conda 与 Spyder 环境管理
前言 作为 Python 科学计算领域的黄金搭档,Anaconda 和 Spyder 为研究人员和数据分析师提供了强大的工作环境。本文将详细介绍如何使用 Conda 管理 Python 环境,并在 Spyder IDE 中灵活切换这些环境,助你打造高效的 Python 开发工作流。 一…...
头皮理疗预约小程序开发实战指南
生活服务类小程序开发正成为互联网创业的热点领域,头皮理疗预约小程序作为其中的细分品类,具有广阔的市场前景和用户需求。基于微信小程序原生开发或uniapp框架,结合Java后端和MySQL数据库,可构建一个功能完善、性能稳定且易于维护的头皮理疗预约平台。本文将从零开始,详细…...
cPanel 的 Let’s Encrypt™ 插件
在 cPanel & WHM 中,推出了一个名为 AutoSSL 的功能。可能有些朋友还不了解 AutoSSL,它是一个能够自动为您的网站申请和安装免费 SSL 证书的工具,这些证书由 Comodo 签发,保证网站的安全性。 AutoSSL 与 Let’s Encrypt Let’…...
《Android 应用开发基础教程》——第十一章:Android 中的图片加载与缓存(Glide 使用详解)
目录 第十一章:Android 中的图片加载与缓存(Glide 使用详解) 🔹 11.1 Glide 简介 🔸 11.2 添加 Glide 依赖 🔸 11.3 基本用法 ✦ 加载网络图片到 ImageView: ✦ 加载本地资源 / 文件 / UR…...
MySQL 中的游标(Cursor)
一、游标的作用 逐行处理数据:当需要对查询结果集中的每一行进行特定操作(如计算、条件判断、调用其他过程)时使用。替代集合操作:在无法通过单一 SQL 语句完成复杂逻辑时,游标提供逐行处理的能力。…...
【嵌入式Linux】基于ARM-Linux的zero2平台的智慧楼宇管理系统项目
目录 1. 需求及项目准备(此项目对于虚拟机和香橙派的配置基于上一个垃圾分类项目,如初次开发,两个平台的环境变量,阿里云接入,摄像头配置可参考垃圾分类项目)1.1 系统框图1.2 硬件接线1.3 语音模块配置1.4 …...
记忆翻牌游戏:认知科学与状态机的交响曲
目录 记忆翻牌游戏:认知科学与状态机的交响曲引言第一章 网格空间拓扑学1.1 自适应网格算法1.2 卡片排布原理 第二章 状态机设计2.1 状态跃迁矩阵2.2 时空关联模型 第三章 记忆强化机制3.1 认知衰减曲线3.2 注意力热力图 第四章 动画引擎设计4.1 翻牌运动方程4.2 粒…...
【业务领域】InfiniBand协议总结
InfiniBand协议总结 InfiniBand协议是什么?Infiniband产生的原因Mellanox公司介绍及其新闻基于TCP/IP的网络与IB网络的比较IB标准的优势什么是InfiniBand网络什么是InfiniBand架构Mellanox IB卡介绍InfiniBand速率发展介绍InfiniBand网络主要上层协议InfiniBand管理…...
使用Java正则表达式检查字符串是否匹配
在Java开发中,正则表达式(Regular Expression,简称Regex)是处理字符串的强大工具,广泛应用于模式匹配、数据验证和文本处理。Java通过java.util.regex包提供了对正则表达式的支持,包含Pattern和Matcher两个…...
个人健康中枢的多元化AI硬件革新与精准健康路径探析
在医疗信息化领域,个人健康中枢正经历着一场由硬件技术革新驱动的深刻变革。随着可穿戴设备、传感器技术和人工智能算法的快速发展,新一代健康监测硬件能够采集前所未有的多维度生物数据,并通过智能分析提供精准的健康建议。本文将深入探讨构成个人健康中枢的最新硬件技术,…...
Android基础控件用法介绍
Android基础控件用法详解 Android应用开发中,基础控件是构建用户界面的核心元素。本文将详细介绍Android中最常用的基础控件及其用法。 一、TextView(文本显示控件) TextView用于在界面上显示文本信息。 基本用法 <TextViewandroid:id="@+id/textView"andr…...
iO(不可区分混淆)是Web3隐私的圣杯?
1. 引言 iO 是区块链隐私的圣杯吗?本文将探讨: 不可区分混淆(indistinguishability obfuscation, iO)的局限性iO可能带来的变革iO为何重要?iO是否能真正成为可信硬件的替代方案? 区块链隐私面临的最大挑…...
文章三《机器学习基础概念与框架实践》
文章3:机器学习基础概念与框架实践 ——从理论到代码,用Scikit-learn构建你的第一个分类模型 一、机器学习基础理论:三大核心类型 机器学习是人工智能的核心,通过数据让计算机自动学习规律并做出预测或决策。根据学习方式&#…...
中小企业MES系统概要设计
版本:V1.0 日期:2025年5月2日 一、系统架构设计 1.1 整体架构模式 采用分层微服务架构,实现模块解耦与灵活扩展,支持混合云部署: #mermaid-svg-drxS3XaKEg8H8rAJ {font-family:"trebuchet ms",verdana,ari…...
自动化测试项目1 --- 唠嗑星球 [软件测试实战 Java 篇]
目录 项目介绍 项目源码库地址 项目功能测试 1.自动化实施步骤 1.1 编写测试用例 1.2 自动化测试脚本开发 1.2.1 配置相关环境, 添加相关依赖 1.2.2 相关代码编写 2. 自动化功能测试总结 2.1 弹窗的解决相关问题 2.2 断言的使用和说明 2.3 重新登录问题 项目性能…...
c语言 关键字--目录
1.c语言 关键字 2.typedef 关键字 3.volatile 关键字 4.register 关键字 5.const关键字用法 6.extern关键字...
C语言与指针3——基本数据类型
误区补充 char 的 表示范围0-127 signed char 127 unsigned char 0-255enum不常用,但是常见,这里记录一下。 enum Day {Monday 1,//范围是IntTuesday 2,Wednesday 3 }; enum Day d Monday; switch (d) {case Monday:{printf("%d",Monday);…...
[更新完毕]2025五一杯C题五一杯数学建模思路代码文章教学:社交媒体平台用户分析问题
完整内容请看文章最下面的推广群 社交媒体平台用户分析问题 在问题一中为解决博主在特定日期新增关注数的预测问题,本文构建了基于用户历史行为的二分类模型。首先,从用户对博主的观看、点赞、评论、关注等交互行为中提取统计与时序特征,形成…...
使用Vite创建vue3项目
什么是vite Vite 是新一代构建工具,由 Vue 核心团队开发,提供极快的开发体验。 它利用浏览器原生ES模块导入功能,提供了极快的热模块更新(HMR)和开发服务器启动速度。 官网:https://vitejs.cn/vite3-cn/…...
Linux管道识
深入理解管道 (Pipes):连接命令的瑞士军刀 在 Linux 和类 Unix 系统(包括 macOS)的命令行世界里,管道(Pipe)是一个极其强大且基础的概念。它允许你将一个命令的输出直接作为另一个命令的输入,像…...
LabVIEW 中VI Server导出 VI 配置
该 LabVIEW VI 展示了在 VI Server 中配置和执行 Exported VIs 的过程,实现对服务器端导出 VI 的远程调用与操作。 具体过程及模块说明 前期配置:需确保在 LabVIEW 的 “Tools> Options > VI Server > Protocols” 路径下,启用 …...
map和set的遗留 + AVL树(1):
在讲解新的东西之前,我们把上节遗留的问题说一下: 遗留问题: 首先,我们的最上面的代码是一个隐式类型转换,我们插入了一对数据。 我们说了,我们的方括号的底层是insert,当我们调用operator的[…...
Java学习手册:Spring Security 安全框架
一、Spring Security 简介 Spring Security 是一个功能强大且高度可定制的身份验证和访问控制框架,用于保护 Java 应用程序,尤其是基于 Spring 的应用。它构建在 Spring 框架之上,能够轻松地集成到基于 Spring 的应用程序中,包括…...
pip 常用命令及配置
一、python -m pip install 和 pip install 的区别 在讲解 pip 的命令之前,我们有必要了解一下 python -m pip install 和 pip install 的区别,以便于我们在不同的场景使用不同的方式。 python -m pip install 命令使用 python 可执行文件将 pip 模块作…...
C++_STL
C 标准模板库(Standard Template Library,STL)是一套功能强大的 C 模板类和函数的集合,它提供了一系列通用的、可复用的算法和数据结构。 STL 的设计基于泛型编程,这意味着使用模板可以编写出独立于任何特定数据类型的…...
[FPGA Video] AXI4-Stream Remapper
Xilinx AXI4-Stream Remapper IP (PG379) 详细介绍 概述 Xilinx LogiCORE™ IP AXI4-Stream Remapper 核是一个专为视频处理设计的模块,用于在不同每时钟像素数(Pixels Per Clock, PPC)要求之间重新映射视频像素。它支持将输入 AXI4-Stream…...
【数据结构】励志大厂版·初阶(复习+刷题):栈与队列
前引:本篇将由小编与大家一起复习 栈 、队列 的知识点,栈、队列的顺序、链式结构各个缺点好处,如何实现、对于一般的增删查找此篇文章一定再详细不过!对代码的注释、何时需要判断、特殊情况,白话文版一解到底ÿ…...
pytest——参数化
之前有说过,通过pytest测试框架标记参数化功能可以实现数据驱动测试。数据驱动测试使用的文件主要有以下类型: txt 文件 csv 文件excel 文件json 文件yaml 文件.... 本文主要讲的就是以上几种文件类型的读取和使用 一.txt 文件读取使用 首先创建一个 …...
第7篇:RESTful API设计与安全防护
在前后端分离架构中,RESTful API是系统交互的核心通道。本文将从接口规范设计到安全防护,全面讲解如何在EggJS中构建安全、规范、易用的API系统,并提供完整的解决方案和最佳实践。 一、标准化API接口规范设计 1. RESTful设计原则 核心要素&…...
CSS 架构与命名规范
CSS 架构与命名规范:BEM、SMACSS 与 OOCSS 实战 引言 在前端开发中,随着项目规模的扩大,CSS 代码往往会变得难以维护和扩展。无组织的样式表会导致命名冲突、权重覆盖问题和样式继承混乱,这些问题在团队协作的大型项目中尤为明显…...
实验二 软件白盒测试
实验二 软件白盒测试 某工资计算程序功能如下:若雇员月工作小时超过40小时,则超过部分按原小时工资的1.5倍的加班工资来计算。若雇员月工作小时超过50小时,则超过50的部分按原小时工资的3倍的加班工资来计算,而40到50小时的工资仍…...
【数据结构】String字符串的存储
目录 一、存储结构 1.字符串常量池 2.字符串哈希表 2.1结构 2.2基础存储单位 2.2.1键对象 2.2.2值对象 二、存储过程 1.搜索 2.创建 三、存储位置 四、存储操作 1.new新建 2.intern入池 这是String类的详解:String类变量 一、存储结构 1.字符串常量池…...
LLMs Tokenizer Byte-Pair Encoding(BPE)
1 Byte-Pair Encoding(BPE) 如何构建词典? 准备足够的训练语料;以及期望的词表大小;将单词拆分为字符粒度(字粒度),并在末尾添加后缀“”,统计单词频率合并方式:统计每一个连续/相邻字节对的出现频率,将最高频的连续字…...
npm,yarn,pnpm,cnpm,nvm,npx包管理器常用命令
前端比较主流的包管理器主要有三个npm,yarn,pnpm 多层级依赖,通常发生在依赖之间存在复杂的版本要求时 包 A 依赖于包 B1.0.0 包 B 依赖于包 C2.0.0 另一个包 D 也依赖于 C3.0.0 一、NPM (Node Package Manager) https://www.npmjs.cn/…...
使用mybatis实例类和MySQL表的字段不一致怎么办
在 MyBatis 中,当 Java 实体类的属性名与数据库表的字段名不一致时,会导致查询结果无法正确映射。以下是几种常见解决方案及代码示例: 1. 使用 resultMap 显式映射(推荐) 场景:字段名与属性名差异较大&…...
阿里发布新一代通义千问 Qwen3模型
近日,阿里巴巴发布了新一代通义千问 Qwen3 模型,一举登顶全球最强开源模型。 这是国内首个“混合推理模型”,将“快思考”与“慢思考”集成进同一个模型,大大节省算力消耗。 旗舰模型 Qwen3-235B-A22B 在代码、数学、通用能力等…...
React pros比较机制
将 count1作为prop传递给Memoson组件 引用类型情况 虽然list值没有发生变化,但是仍旧重新渲染 解决方法使用useMemo()函数,传递一个空依赖项 // 传递数据为引用类型 比较的是引用 // 使用useMemo函数改写、const list useMemo(()>{return [1,2,3]},[…...
Flowable7.x学习笔记(十七)审批我的待办
前言 前文完成了我的待办的查询功能,本文就在此基础上从源码解读到完成审批任务的功能,审批界面我就先不带表单,直接单纯审批通过,这里需要注意的事,审批的表单其实每个节点都可能需要不同的表单内容,后续要…...
HTTP 状态码详解:用途与含义
HTTP 状态码详解:用途与含义 HTTP 状态码详解:用途与含义1. (2xx)成功类200 OK201 Created204 No Content206 Partial Content 2. (3xx)重定向类301 Moved Permanently302 Found(临时重定向&…...
AI日报 · 2025年05月02日 | 再见GPT-4!OpenAI CEO 确认 GPT-4 已从 ChatGPT 界面正式移除
1、OpenAI CEO 确认 GPT-4 已从 ChatGPT 界面正式移除 在处理 GPT-4o 更新问题的同时,OpenAI CEO Sam Altman 于 5 月 1 日在 X 平台发文,正式确认初代 GPT-4 模型已从 ChatGPT 主用户界面中移除。此举遵循了 OpenAI 此前公布的计划,即在 4 …...
ppt设计美化公司_杰青_长江学者_优青_青年长江学者_万人计划青年拔尖人才答辩ppt模板
WordinPPT / 持续为双一流高校、科研院所、企业等提供PPT制作系统服务。 / 近期PPT美化案例 - 院士增选、科学技术奖、杰青、长江学者特聘教授、校企联聘长江、重点研发、优青、青长、青拔.. 杰青(杰出青年科学基金) 支持已取得突出成果的45岁以下学…...
文章四《深度学习核心概念与框架入门》
文章4:深度学习核心概念与框架入门——从大脑神经元到手写数字识别的奇幻之旅 引言:给大脑装个"GPU加速器"? 想象一下,你的大脑如果能像智能手机的GPU一样快速处理信息会怎样?这正是深度学习的终极目标&…...
HTML5+JavaScript实现连连看游戏之二
HTML5JavaScript实现连连看游戏之二 以前一篇,见 https://blog.csdn.net/cnds123/article/details/144220548 连连看游戏连接规则: 只能连接相同图案(或图标、字符)的方块。 连线路径必须是由直线段组成的,最多可以有…...