【从零实现Json-Rpc框架】- 项目实现 - 客户端注册主题整合 及 rpc流程示意
📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈一、rpc_registry 注册模块
- 1.1 rpc_registry 框架逻辑分析
- 1.1.1 Provider类
- 1.1.2 MethodHost类
- 1.1.3 Discoverer类
- 1.2 Provider类
- 1.3 MethodHost 类
- 1.4 Discoverer 类
- 🏳️🌈二、rpc_topic 主题模块
- 2.1 逻辑框架
- 2.2 核心功能
- 2.3 接口方法详解
- 2.3.1 主题管理
- 2.3.2 订阅管理
- 2.3.3 消息发布
- 2.3.4 消息处理
- 2.4 核心逻辑:commonRequest 方法
- 2.4.1 构造请求:
- 2.4.2 发送请求
- 2.4.3 处理响应:
- 2.4.4 实现代码
- 2.5 整体代码
- 🏳️🌈三、rpc_client 客户端整合
- 3.1 整体架构
- 3.2 核心类详解
- 3.2.1 RegistryClient(服务注册客户端)
- 3.2.2DiscoveryClient(服务发现客户端)
- 3.2.3RpcClient(RPC调用客户端)
- 3.2.4TopicClient(主题客户端)
- 3.3 协作逻辑与设计思路
- 3.3.1 消息分发(Dispatcher)
- 3.3.2 请求处理(Requestor)
- 3.3.3 服务发现与连接池(RpcClient)
- 3.3.4 主题管理(TopicManager)
- 🏳️🌈四、主题发布订阅流程示意
- 4.1 核心文件与类说明
- 4.2 步骤 1:客户端A创建主题
- 4.3 步骤 2:客户端B订阅主题
- 4.4 步骤 3:客户端A发布消息
- 4.5 步骤 4:服务端广播消息
- 4.6 步骤 5:客户端B接收消息
- 4.7 关键类交互图
- 4.8 补充说明
- 🏳️🌈五、运程调用add方法流程示意
- 5.1 核心文件与类说明
- 5.2 步骤 1:服务端启动并注册 add 方法
- 5.3 步骤 2:客户端构造调用请求
- 5.4 步骤3:服务端接收请求并路由到 add 方法
- 5.5 步骤4:网络层数据传输
- 5.6 关键类交互图
- 5.7 补充说明
- 👥总结
📢前言
到目前为止,我们已经将整个 rpc项目 的基本完成了,就只剩下客户端的部分功能了
这篇文章将带大家实现一些客户端的最后三个模块
- 客户端注册模块
- 客户端主题模块
- 客户端整合
本系列博客的代码都在上述库中的下图(demo代表笔者模仿的案例)
🏳️🌈一、rpc_registry 注册模块
1.1 rpc_registry 框架逻辑分析
该代码实现了一个 RPC
客户端核心模块,包含 服务注册、服务发现与动态维护、负载均衡 三大核心功能,通过以下三个类协作完成
graph TDA[Provider] -->|注册服务| B(注册中心)C[Discoverer] -->|发现服务| BC -->|维护本地缓存| D[MethodHost]
1.1.1 Provider类
- 这个类的 核心功能 是 向服务端注册服务方法,并处理注册响应。
- 构造函数接受一个
Requestor
的智能指针,用于发送请求。 registryMethod
方法负责 发送服务注册请求,构造ServiceRequest消息,并通过Requestor发送,然后处理响应。- 如果注册失败,会记录错误日志。
// 核心功能:向服务端注册服务方法,并处理注册响应。
class Provider {
public:using ptr = std::shared_ptr<Provider>;Provider(const Requestor::ptr& requestor);// 注册服务方法bool registryMethod(const BaseConnection::ptr& conn,const std::string& method, const Address& host);private:Requestor::ptr _requestor;
};
1.1.2 MethodHost类
- 管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
- 成员变量包括互斥锁、轮询索引和主机地址列表。
- 提供添加主机、移除主机、选择主机的方法,这些操作都是线程安全的。
chooseHost
方法使用简单的轮询算法实现负载均衡。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:using ptr = std::shared_ptr<MethodHost>;MethodHost();MethodHost(const std::vector<Address>& hosts);// 添加主机地址void appendHost(const Address& host);// 移除主机地址void removerHost(const Address& host);// 轮询选择主机(简单负载均衡)Address chooseHost();bool empty();private:std::mutex _mutex;size_t _idx; // 轮询选择下标std::vector<Address> _hosts; // 主机地址列表
};
1.1.3 Discoverer类
- 发现服务提供者,处理上下线通知,维护本地服务缓存。
- 包含一个离线回调函数,当服务下线时触发。
serviceDiscovery
方法处理服务发现请求,先检查本地缓存,若未命中则向服务端请求,并更新缓存。onServiceRequest
处理服务端推送的上下线通知,更新本地的主机列表。- 使用
MethodHost
类来管理每个方法的主机列表,确保线程安全。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:using ptr = std::shared_ptr<Discoverer>;using OfflineCallback = std::function<void(const Address&)>;Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb);// 主动发起服务发现请求。// 1. 本地缓存命中:直接返回已缓存的主机地址。// 2. 缓存未命中:发送 SERVICE_DISCOVERY// 请求,获取服务地址并更新缓存。bool serviceDiscovery(const BaseConnection::ptr& conn,const std::string& method, Address& host);// 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数// 处理服务端推送的上下线通知。// 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址void onServiceRequest(const BaseConnection::ptr& conn,const ServiceRequest::ptr& msg);private:OfflineCallback _offline_callback;std::mutex _mutex;std::unordered_map<std::string, MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;
};
1.2 Provider类
功能:向注册中心注册服务方法。
关键成员:
_requestor
:发送请求和接收响应的工具类。
核心方法:
- registryMethod(conn, method, host)
发送SERVICE_REGISTRY
请求,将服务方法method
和主机host
注册到注册中心。
class Provider {
public:using ptr = std::shared_ptr<Provider>;Provider(const Requestor::ptr& requestor) : _requestor(requestor) {}// 注册服务方法bool registryMethod(const BaseConnection::ptr& conn,const std::string& method, const Address& host) {// 1. 创建 SERVICE_REGISTRY 类型的请求消息。// 2. 通过 Requestor 发送请求并等待响应。// 3. 验证响应类型和状态码,返回注册结果。auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setServiceOptype(ServiceOptype::SERVICE_REGISTRY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("%s 服务注册失败!", method.c_str());return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("响应类型向下转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务注册失败,原因: %s",errReason(service_rsp->rcode()).c_str());return false;}return true;}private:Requestor::ptr _requestor;
};
1.3 MethodHost 类
功能:管理某个服务方法的主机列表,支持动态增删和轮询选择。
关键成员:
_hosts
:服务提供者地址列表。_idx
:轮询索引,实现简单负载均衡。
核心方法:
- appendHost(host):线程安全地添加主机地址。
- removeHost(host):线程安全地移除主机地址。
- chooseHost():轮询选择下一个主机地址。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:using ptr = std::shared_ptr<MethodHost>;MethodHost() : _idx(0) {}MethodHost(const std::vector<Address>& hosts): _hosts(hosts.begin(), hosts.end()), _idx(0) {}// 添加主机地址void appendHost(const Address& host) {// 中途收到了服务上线请求后被调用std::unique_lock<std::mutex> lock(_mutex);_hosts.emplace_back(host);}// 移除主机地址void removerHost(const Address& host) {// 中途收到了服务下线请求后被调用std::unique_lock<std::mutex> lock(_mutex);for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {if (*it == host) {_hosts.erase(it);break;}}}// 轮询选择主机(简单负载均衡)Address chooseHost() {std::unique_lock<std::mutex> lock(_mutex);size_t pos = _idx++ % _hosts.size();return _hosts[pos];}bool empty() { return _hosts.empty(); }private:std::mutex _mutex;size_t _idx; // 轮询选择下标std::vector<Address> _hosts; // 主机地址列表
};
1.4 Discoverer 类
功能:动态发现服务地址,处理上下线通知,维护本地缓存。
关键成员:
_method_hosts
:服务名 → 主机列表的映射(使用MethodHost
管理)。_offline_callback
:服务下线时的回调函数。
核心方法:
- serviceDiscovery(conn, method, host)
若本地缓存有效,直接返回地址;否则向注册中心发送SERVICE_DISCOVERY
请求。 - onServiceRequest(conn, msg)
处理服务端推送的上下线通知,更新本地缓存。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:using ptr = std::shared_ptr<Discoverer>;using OfflineCallback = std::function<void(const Address&)>;Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb): _requestor(requestor), _offline_callback(cb) {}// 主动发起服务发现请求。// 1. 本地缓存命中:直接返回已缓存的主机地址。// 2. 缓存未命中:发送 SERVICE_DISCOVERY// 请求,获取服务地址并更新缓存。bool serviceDiscovery(const BaseConnection::ptr& conn,const std::string& method, Address& host) {{// 当前所报关的提供者信息存在,则直接返回地址std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(method);if (it != _method_hosts.end()) {if (it->second->empty() == false) {host = it->second->chooseHost();return true;}}}// 缓存未命中,发起服务发现请求// 当前服务的提供者为空auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setServiceOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("服务发现失败!");return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("服务发现时响应类型向下转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());return false;}// 走到这里,代表当前服务没有对应主机提过服务的std::unique_lock<std::mutex> lock(_mutex);auto method_host = std::make_shared<MethodHost>(service_rsp->Hosts());if (method_host->empty()) {ELOG("%s 服务发现时,服务提供者列表为空!", method.c_str());return false;}DLOG("服务发现成功");host = method_host->chooseHost();_method_hosts[method] = method_host;return true;}// 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数// 处理服务端推送的上下线通知。// 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址void onServiceRequest(const BaseConnection::ptr& conn,const ServiceRequest::ptr& msg) {// 1. 判断是上线还是下线请求,如果都不是那就不用处理了ServiceOptype optype = msg->Serviceoptype();std::string method = msg->method();std::unique_lock<std::mutex> lock(_mutex);// 2. 上线请求:找到MethodHost,向其中新增一个主机地址if (optype == ServiceOptype::SERVICE_ONLINE) {auto it = _method_hosts.find(method);if (it == _method_hosts.end()) {MethodHost::ptr method_host = std::shared_ptr<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method] = method_host;} else {it->second->appendHost(msg->host());}}// 3. 下线请求:找到MethodHost,从其中删除一个主机地址else if (optype == ServiceOptype::SERVICE_OFFLINE) {auto it = _method_hosts.find(method);if (it == _method_hosts.end())return;it->second->removerHost(msg->host());_offline_callback(msg->host());}}private:OfflineCallback _offline_callback;std::mutex _mutex;std::unordered_map<std::string, MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;
};
🏳️🌈二、rpc_topic 主题模块
先说说 客户端 和 服务端 的主题模块的区别
- 客户端:负责发起主题操作请求,处理服务端推送的消息,不维护全局状态。
- 服务端:维护全局主题和订阅者关系,处理客户端请求并广播消息,不涉及业务逻辑。
- 两者协同工作,客户端通过请求-响应模型与服务端交互,服务端确保消息的正确路由和广播。
2.1 逻辑框架
#pragma once
#include "requestor.hpp"
#include <unordered_set>namespace rpc
{namespace client{class TopicManager{public:using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;using ptr = std::shared_ptr<TopicManager>;TopicManager(const Requestor::ptr &requestor) : _requestor(requestor) {}// 创建主题bool createTopic(const BaseConnection::ptr &_conn, const std::string &key);// 取消主题bool removeTopic(const BaseConnection::ptr &_conn, const std::string &key);// 订阅主题bool subscribeTopic(const BaseConnection::ptr &conn, const std::string &key, const SubCallback &cb);// 取消订阅主题的订阅者bool cancelTopic(const BaseConnection::ptr &conn, const std::string &key);// 发布消息bool publishTopic(const BaseConnection::ptr &conn, const std::string &key, const std::string &msg);// 处理服务器返回的主题消息void onPublishTopic(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);private:void addSubscriber(const std::string &key, const SubCallback &cb);void delSubscriber(const std::string &key);const SubCallback getSubcallback(const std::string &key);bool commonRequest(const BaseConnection::ptr &conn, const std::string &key, TopicOptype type, const std::string &msg = " ");private:std::mutex _mutex;std::unordered_map<std::string, SubCallback> _topic_callbacks; // 主题名 -> 回调函数Requestor::ptr _requestor;};}
}
2.2 核心功能
client::TopicManager
是 RPC 客户端用于管理 主题(Topic)生命周期和消息订阅/发布 的核心类,主要功能包括:
- 主题管理:创建、删除主题。
- 订阅管理:订阅/取消订阅主题,绑定消息回调。
- 消息发布:向指定主题发送消息。
- 消息处理:接收服务端推送的主题消息,触发本地回调。
2.3 接口方法详解
2.3.1 主题管理
// 创建主题
bool createTopic(const BaseConnection::ptr& _conn, const std::string& key) {return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);
}// 删除主题
bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key) {return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);
}
// 创建主题 "news"
topic_mgr->createTopic(conn, "news");
// 删除主题 "news"
topic_mgr->removeTopic(conn, "news");
2.3.2 订阅管理
// 订阅主题
bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key,const SubCallback& cb) {addSubscriber(key, cb);return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
}// 取消订阅主题的订阅者
bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key) {delSubscriber(key);return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);
}
// 订阅主题 "news",定义回调函数
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {std::cout << "收到消息: " << msg << std::endl;
});
// 取消订阅 "news"
topic_mgr->cancelTopic(conn, "news");
2.3.3 消息发布
// 发布消息
bool publishTopic(const BaseConnection::ptr& conn, const std::string& key,const std::string& msg) {return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);
}
// 发布消息到主题 "news"
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");
2.3.4 消息处理
内部流程:
- 验证操作类型:确保消息类型为
TOPIC_PUBLISH
。 - 提取消息内容:获取主题名
topic_key
和消息内容topic_msg
。 - 查找回调函数:通过
getSubcallback
查找本地注册的回调。 - 触发回调:执行回调函数处理消息。
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg) {// 1. 从消息中取出操作类型进行判断,是否时消息请求auto type = msg->optype(); // 获取主题操作类型if (type != TopicOptype::TOPIC_PUBLISH) {ELOG("错误的主题操作类型:%d", static_cast<int>(type));return;}// 2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错auto callback = getSubcallback(topic_key);if (!callback) {ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())return;}return callback(topic_key, topic_msg);
}
2.4 核心逻辑:commonRequest 方法
所有主题操作请求通过 commonRequest 统一处理,流程如下
2.4.1 构造请求:
auto msg_rsp = MessageFactory::create<TopicRequest>();
msg_rsp->setTopicKey(key);
msg_rsp->setOptype(type); // 设置操作类型(如 TOPIC_CREATE)
2.4.2 发送请求
bool ret = _requestor->send(conn, msg_rsp, msg_rep);
2.4.3 处理响应:
- 检查响应类型是否为
TopicResponse
。 - 验证状态码
rcode()
是否成功。
2.4.4 实现代码
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg) {// 1. 从消息中取出操作类型进行判断,是否时消息请求auto type = msg->optype(); // 获取主题操作类型if (type != TopicOptype::TOPIC_PUBLISH) {ELOG("错误的主题操作类型:%d", static_cast<int>(type));return;}// 2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();bool commonRequest(const BaseConnection::ptr& conn, const std::string& key,TopicOptype type, const std::string& msg = " ") {// 1. 构造请求对象,并填充数据auto msg_rsp = MessageFactory::create<TopicRequest>();msg_rsp->setId(UUID::uuid());msg_rsp->setMType(MType::REQ_TOPIC);msg_rsp->setOptype(type);msg_rsp->setTopicKey(key);if (type == TopicOptype::TOPIC_PUBLISH)msg_rsp->setTopicMsg(msg);// 2. 向 服务端 发送 同步 请求,等待响应BaseMessage::ptr msg_rep;bool ret = _requestor->send(conn, msg_rsp, msg_rep);if (ret == false) {ELOG("主题操作请求失败!");return false;}// 3. 获取服务端响应信息,判断请求处理是否成功auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);if (!topic_rsp_msg) {ELOG("主题操作响应,但向下转换时失败");return false;}if (topic_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("主题操作请求出错:%s",errReason(topic_rsp_msg->rcode()).c_str());return false;}ELOG("主题操作请求成功");return true;}// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错auto callback = getSubcallback(topic_key);if (!callback) {ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())return;}return callback(topic_key, topic_msg);
}
2.5 整体代码
#pragma once
#include "requestor.hpp"
#include <unordered_set>namespace rpc{namespace client{class TopicManager{public:using SubCallback = std::function<void(const std::string& key, const std::string& msg)>;using ptr = std::shared_ptr<TopicManager>;TopicManager(const Requestor::ptr& requestor) : _requestor(requestor) {}// 创建主题bool createTopic(const BaseConnection::ptr& _conn, const std::string& key){return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);}// 删除主题bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key){return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);}// 订阅主题bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key, const SubCallback& cb){addSubscriber(key, cb);return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);}// 取消订阅主题的订阅者bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key){delSubscriber(key);return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);}// 发布消息bool publishTopic(const BaseConnection::ptr& conn, const std::string& key, const std::string& msg){return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);}// 处理服务器返回的主题消息void onPublishTopic(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){// 1. 从消息中取出操作类型进行判断,是否时消息请求auto type = msg->optype(); // 获取主题操作类型if(type != TopicOptype::TOPIC_PUBLISH){ELOG("错误的主题操作类型:%d", static_cast<int>(type));return;}// 2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错auto callback = getSubcallback(topic_key);if(!callback){ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())return;}return callback(topic_key, topic_msg);}private:void addSubscriber(const std::string& key, const SubCallback& cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key, cb));}void delSubscriber(const std::string& key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback getSubcallback(const std::string& key){std::unique_lock<std::mutex> _lock(_mutex);auto it = _topic_callbacks.find(key);if(it == _topic_callbacks.end()){ELOG("主题 %s 没有订阅者", key.c_str());return SubCallback();}return it->second;}bool commonRequest(const BaseConnection::ptr& conn, const std::string& key, TopicOptype type, const std::string& msg = " "){// 1. 构造请求对象,并填充数据auto msg_rsp = MessageFactory::create<TopicRequest>();msg_rsp->setId(UUID::uuid());msg_rsp->setMType(MType::REQ_TOPIC);msg_rsp->setOptype(type);msg_rsp->setTopicKey(key);if(type == TopicOptype::TOPIC_PUBLISH)msg_rsp->setTopicMsg(msg);// 2. 向 服务端 发送 同步 请求,等待响应BaseMessage::ptr msg_rep;bool ret = _requestor->send(conn, msg_rsp, msg_rep);if(ret == false){ELOG("主题操作请求失败!");return false;}// 3. 获取服务端响应信息,判断请求处理是否成功auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);if(!topic_rsp_msg){ELOG("主题操作响应,但向下转换时失败");return false;}if(topic_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("主题操作请求出错:%s", errReason(topic_rsp_msg->rcode()).c_str());return false;}ELOG("主题操作请求成功");return true;}private:std::mutex _mutex;std::unordered_map<std::string, SubCallback> _topic_callbacks; // 主题名 -> 回调函数Requestor::ptr _requestor;};}
}
🏳️🌈三、rpc_client 客户端整合
3.1 整体架构
这段代码实现了一个 分布式RPC客户端框架,包含四个核心类:
RegistryClient
(服务注册)DiscoveryClient
(服务发现)RpcClient
(RPC调用)TopicClient
(主题发布/订阅)
它们通过 Dispatcher
(消息分发器)和 Requestor
(请求处理器)协作,实现服务注册、发现、调用及消息推送功能。
graph TDA[RegistryClient] -->|注册服务| B(注册中心)C[DiscoveryClient] -->|发现服务| BD[RpcClient] -->|调用服务| E(服务提供者)F[TopicClient] -->|发布/订阅| G(消息中心)
3.2 核心类详解
3.2.1 RegistryClient(服务注册客户端)
功能:将本地服务方法注册到远程注册中心。
成员变量:
_requestor
:管理请求生命周期(超时、重试)。_provider
:封装服务注册协议,构造SERVICE_REGISTRY
请求。_dispatcher
:分发注册响应消息。_client
:连接注册中心的网络客户端。
关键接口:
registryMethod(method, host)
:注册方法method
到地址 host。
// 示例:注册服务方法
RegistryClient client("127.0.0.1", 8080);
client.registryMethod("add", Address("192.168.1.100", 9090));
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:using ptr = std::shared_ptr<RegistryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string& ip, int port): _requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()) {// 设置响应的回调函数auto rsp_cb =std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);// 注册 rpc 响应处理函数_dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,rsp_cb);// std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>// message_cb = std::bind(&rpc::Dispatcher::onMessage,// _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);// 设置客户端连接注册中心的地址_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}// 向外提供的服务注册接口bool registryMethod(const std::string& method, const Address& host) {return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY// 请求,验证响应状态码Dispatcher::ptr_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)BaseClient::ptr_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};
3.2.2DiscoveryClient(服务发现客户端)
功能:动态发现服务地址,处理上下线通知。
成员变量:
_discoverer
:维护本地服务地址缓存。_dispatcher
:处理服务发现响应和上下线通知。
关键接口:
serviceDiscovery(method, host
):查询method
的可用地址。
// 示例:发现服务地址
DiscoveryClient client("127.0.0.1", 8080, [](const Address& host) {std::cout << "服务下线: " << host.first << std::endl;
});
Address addr;
client.serviceDiscovery("add", addr); // 获取 "add" 方法的地址
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:using ptr = std::shared_ptr<RegistryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string& ip, int port): _requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()) {// 设置响应的回调函数auto rsp_cb =std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);// 注册 rpc 响应处理函数_dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,rsp_cb);// std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>// message_cb = std::bind(&rpc::Dispatcher::onMessage,// _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);// 设置客户端连接注册中心的地址_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}// 向外提供的服务注册接口bool registryMethod(const std::string& method, const Address& host) {return _provider->registryMethod(_client->class DiscoveryClient {public:using ptr = std::shared_ptr<DiscoveryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心// DiscoveryClient 是// 服务发现客户端,用于从注册中心动态发现服务提供者的地址,并处理服务上下线通知// 连接注册中心:建立与注册中心的网络连接。// 服务发现请求:主动查询指定服务方法的可用地址。// 动态通知处理:监听服务上下线通知,更新本地缓存。// 离线回调机制:当服务下线时,触发用户自定义逻辑。DiscoveryClient(const std::string& ip, int port,const Discoverer::OfflineCallback& cb): _requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),_dispatcher(std::make_shared<Dispatcher>()) {// 1. 注册响应处理回调(处理服务发现响应)auto rsp_cb =std::bind(&Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);// 2. 注册服务请求处理回调(处理服务上下线通知)auto req_cb =std::bind(&client::Discoverer::onServiceRequest,_discoverer.get(), std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);// 3. 设置消息总回调入口auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);// 4. 连接注册中心_client->connect();}// 向外提供的服务发现接口bool serviceDiscovery(const std::string& method,Address& host) {return _discoverer->serviceDiscovery(_client->connection(),method, host);}private:Requestor::ptr_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系。client::Discoverer::ptr_discoverer; // 封装服务发现逻辑,维护本地服务地址缓存,处理上下线通知。Dispatcher::ptr_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如服务上下线通知)BaseClient::ptr_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应};connection(), method, host);}private:Requestor::ptr_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY// 请求,验证响应状态码Dispatcher::ptr_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)BaseClient::ptr_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};
3.2.3RpcClient(RPC调用客户端)
功能:根据配置(直连或服务发现)调用远程方法。
成员变量:
_enableDiscovery
:模式开关(直连/服务发现)。_caller
:执行 RPC 调用,封装请求和响应解析。_rpc_clients
:连接池(服务发现模式下动态管理)。
关键接口:
call(method, params, result)
:调用远程方法 method。
// 示例:调用远程方法
RpcClient client(true, "127.0.0.1", 8080); // 启用服务发现
Json::Value params, result;
params.append(1); params.append(1);
client.call("add", params, result); // 调用 "add(1,1)"
class RpcClient {
public:using ptr = std::shared_ptr<RpcClient>;// enableDiscovery --// 是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址RpcClient(bool enableDiscovery, const std::string& ip, int port): _enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<client::RpcCaller>(_requestor)) {// 1. 注册响应处理回调(处理服务发现响应)auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);// 如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client// 如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好// rpc_clientif (_enableDiscovery) {auto offline_cb =std::bind(&RpcClient::delclient, this, std::placeholders::_1);_discovery_client =std::make_shared<DiscoveryClient>(ip, port, offline_cb);} else {auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}bool call(const std::string& method, const Json::Value& params,Json::Value& result) {// 获取服务提供者: 1. 服务发现 2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {ELOG("没有找到相应的客户端信息");return false;}return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params,RpcCaller::JsonAsyncResponse& result) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}// 3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params,const RpcCaller::JsonResponseCallback& cb) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}// 3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, cb);}private:BaseClient::ptr newClient(const Address& host) {auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}// 获取客户端信息 - 通过地址信息BaseClient::ptr getClient(const Address& host) {std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if (it == _rpc_clients.end()) {ELOG("没有找到相应的客户端信息");return BaseClient::ptr();}return it->second;}// 获取客户端信息 - 通过服务方法BaseClient::ptr getClient(const std::string& method) {BaseClient::ptr client;if (_enableDiscovery) {// 1. 通过服务器发现,获取服务提供者地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if (ret == false) {ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());return BaseClient::ptr();}// 2. 通过地址信息,获取客户端信息client = getClient(host);if (client.get() == nullptr) {ELOG("没有找到一实例化的客户端信息,则创建");client = newClient(host);}} else {client = _rpc_client;}return client;}// 添加客户端信息void putClient(const Address& host, BaseClient::ptr& client) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}// 删除客户端信息void delclient(const Address& host) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash {size_t operator()(const Address& host) const {std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);}};private:bool _enableDiscovery; // 模式开关:true 启用服务发现,false// 直连固定服务提供者Requestor::ptr _requestor; // 管理请求-响应生命周期,处理超时和重试。Dispatcher::ptr _dispatcher; // 分发消息到对应处理器(如 RPC 响应)RpcCaller::ptr _caller; // 执行 RPC 调用,封装请求发送和响应解析逻辑DiscoveryClient::ptr_discovery_client; // 服务发现客户端(仅在服务发现模式下有效)BaseClient::ptr _rpc_client; // 直连模式下的固定服务提供者连接。std::mutex _mutex;std::unordered_map<Address, BaseClient::ptr, AddressHash>_rpc_clients; // 用于服务发现的客户端连接池
};
3.2.4TopicClient(主题客户端)
功能:管理主题的创建、订阅、发布。
成员变量:
_topic_manager
:处理主题操作(如订阅回调)。_dispatcher
:分发主题相关消息。
关键接口:
subscribeTopic(key, callback)
:订阅主题 key 并绑定回调。publishTopic(key, msg)
:向主题 key 发布消息 msg。
// 示例:发布/订阅主题
TopicClient client("127.0.0.1", 9090);
client.subscribeTopic("news", [](const std::string& key, const std::string& msg) {std::cout << "收到消息: " << msg << std::endl;
});
client.publishTopic("news", "RPC框架发布!");
class TopicClient {
public:using ptr = std::shared_ptr<TopicClient>;TopicClient(const std::string& ip, int port): _requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_topic_manager(std::make_shared<TopicManager>(_requestor)) {auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);auto msg_cb =std::bind(&TopicManager::onPublishTopic, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}bool createTopic(const std::string& key) {return _topic_manager->createTopic(_rpc_client->connection(), key);}bool removeTopic(const std::string& key) {return _topic_manager->removeTopic(_rpc_client->connection(), key);}bool subscribeTopic(const std::string& key,const TopicManager::SubCallback& cb) {return _topic_manager->subscribeTopic(_rpc_client->connection(), key,cb);}bool cancelTopic(const std::string& key) {return _topic_manager->cancelTopic(_rpc_client->connection(), key);}bool publishTopic(const std::string& key, const std::string& msg) {return _topic_manager->publishTopic(_rpc_client->connection(), key,msg);}void shutdown() { _rpc_client->shutdown(); }private:Requestor::ptr _requestor;TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;
};
3.3 协作逻辑与设计思路
3.3.1 消息分发(Dispatcher)
作用:根据消息类型(MType)路由到对应处理器。
注册逻辑:
// 注册处理函数(以 RpcClient 为例)
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, [](auto conn, auto msg) {// 处理 RPC 响应
});
3.3.2 请求处理(Requestor)
作用:统一管理请求发送、超时重试、响应映射。
流程:
- 发送请求时生成唯一ID,记录到等待队列。
- 接收响应时根据ID匹配请求,触发回调。
3.3.3 服务发现与连接池(RpcClient)
动态连接管理:
- 启用服务发现时,通过
DiscoveryClient
获取地址,并维护连接池_rpc_clients
。 - 直连模式时固定连接
_rpc_client
。
线程安全:使用 std::mutex 保护连接池操作。
3.3.4 主题管理(TopicManager)
回调机制:订阅时绑定回调函数到 _topic_callbacks
。
消息推送:服务端广播消息后,客户端通过 onPublishTopic
触发本地回调。
🏳️🌈四、主题发布订阅流程示意
截至目前,我们已经封装好了 rpc 项目中所有的内容,现在我们来模拟一下 客户端A发布一个主题消息,客户端B接收到这个主题消息 的全部流程
4.1 核心文件与类说明
4.2 步骤 1:客户端A创建主题
// 1. 创建 TopicManager
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);// 2. 创建主题 "news"
BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->createTopic(conn, "news");
逻辑:
TopicManager::createTopic
发送TopicOptype::TOPIC_CREATE
请求到服务端。- 服务端
server::TopicManager::onTopicRequest
处理请求,创建全局主题"news"
。
4.3 步骤 2:客户端B订阅主题
// 1. 订阅主题 "news",定义回调函数
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {std::cout << "收到主题消息: " << msg << std::endl;
});
逻辑:
TopicManager::subscribeTopic
发送TopicOptype::TOPIC_SUBSCRIBE
请求到服务端。- 服务端记录客户端B为
"news"
的订阅者(存储到server::Topic::_subscribers
)。
4.4 步骤 3:客户端A发布消息
// 客户端A继续发布消息
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");
逻辑:
TopicManager::publishTopic
发送TopicOptype::TOPIC_PUBLISH
请求,携带消息内容。- 服务端
server::TopicManager::onTopicRequest
处理请求,广播消息给所有订阅者。
4.5 步骤 4:服务端广播消息
// 服务端代码(server/rpc_topic.hpp)
void server::TopicManager::topicPublish(...) {// 1. 查找主题 "news"auto topic = _topics["news"];// 2. 遍历所有订阅者(客户端B)for (auto& subscriber : topic->_subscribers) {// 3. 通过订阅者连接发送消息subscriber->_conn->send(msg);}
}
逻辑:
- 服务端通过
Topic::pushMessage
将消息推送到所有订阅者的连接(客户端B)。
4.6 步骤 5:客户端B接收消息
// 客户端B的 TopicManager 处理消息
void client::TopicManager::onPublishTopic(...) {// 1. 提取消息内容std::string topic_key = msg->topicKey(); // "news"std::string topic_msg = msg->topicMsg(); // "Breaking: RPC框架发布!"// 2. 触发订阅时定义的回调函数callback(topic_key, topic_msg);
}
输出
收到主题消息: Breaking: RPC框架发布!
4.7 关键类交互图
sequenceDiagramparticipant ClientA as 客户端Aparticipant Server as 服务端participant ClientB as 客户端BClientA->>Server: createTopic("news")Server-->>ClientA: 创建成功ClientB->>Server: subscribeTopic("news")Server-->>ClientB: 订阅成功ClientA->>Server: publishTopic("news", "Breaking: RPC框架发布!")Server->>ClientB: 推送消息ClientB->>ClientB: 触发回调函数
4.8 补充说明
- 线程安全:
客户端和服务端的TopicManager
使用std::mutex
保护共享数据(如订阅列表)。 - 错误处理:
若主题不存在,服务端返回RCode::RCODE_NOT_FOUND_TOPIC
,客户端记录错误日志。 - 网络通信:
所有请求通过Requestor::send
发送,底层使用BaseConnection
的 TCP 连接。
🏳️🌈五、运程调用add方法流程示意
为了方便各位更好地理解 rpc 框架运程调用的功能,这里模拟一下 客户端调用服务端注册好的add方法,来得到1 + 1的结果的整个流程
5.1 核心文件与类说明
5.2 步骤 1:服务端启动并注册 add 方法
// 文件: test_server.cpp
#include "rpc_server.hpp"
#include "rpc_registry.hpp"
#include "message.hpp"// 1. 定义服务端加法方法
int add(int a, int b) { return a + b; }int main() {// 2. 创建 RpcServer 并绑定端口RpcServer server("0.0.0.0:8080");// 3. 注册方法到 RpcRegistryserver.getRegistry()->registerMethod("add", &add);// 4. 启动服务端监听server.start();
}
逻辑:
- 服务端通过
RpcRegistry::registerMethod
将add
方法注册到全局注册表(rpc_registry.hpp
)。 - 注册时生成映射:“add” → 函数指针 &add。
5.3 步骤 2:客户端构造调用请求
// 文件: test_client.cpp
#include "rpc_caller.hpp"
#include "requestor.hpp"int main() {// 1. 创建 RpcCaller 工具RpcCaller caller;// 2. 生成调用 add(1,1) 的请求auto req = caller.prepareCall("add", 1, 1);// 3. 创建 Requestor 发送请求Requestor requestor;BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");auto rsp = requestor.send(conn, req);// 4. 解析响应结果if (rsp->rcode() == RCode::RCODE_OK) {int result = rsp->getResult<int>();std::cout << "1+1=" << result << std::endl; // 输出 1+1=2}
}
逻辑:
RpcCaller::prepareCall
(rpc_caller.hpp)生成MethodRequest
消息,包含方法名 “add” 和参数 [1,1]。Requestor::send
(requestor.hpp)将请求序列化并通过BaseConnection
发送到服务端。
5.4 步骤3:服务端接收请求并路由到 add 方法
// 文件: dispatcher.hpp
// Dispatcher 注册处理函数(服务端初始化时执行)
Dispatcher dispatcher;
dispatcher.registerHandler<MethodRequest>(MType::REQ_METHOD, [registry](const BaseConnection::ptr& conn, std::shared_ptr<MethodRequest>& req) {// 1. 从 RpcRegistry 查找方法 "add"auto method = registry->getMethod(req->method());// 2. 执行方法并获取结果int result = method->invoke<int>(req->params());// 3. 构造响应消息auto rsp = MessageFactory::create<MethodResponse>();rsp->setResult(result);rsp->setRcode(RCode::RCODE_OK);// 4. 发送响应conn->send(rsp);}
);
逻辑:
Dispatcher
(dispatcher.hpp)根据消息类型MType::REQ_METHOD
路由到注册的Lambda
处理函数。- 从
RpcRegistry
中查找方法 “add”,调用并返回结果。
5.5 步骤4:网络层数据传输
// 文件: net.hpp
// 服务端接收请求(BaseConnection::onMessage)
void BaseConnection::onMessage(const Buffer& buf) {// 1. 反序列化为 MethodRequestauto msg = deserialize<MethodRequest>(buf);// 2. 通过 Dispatcher 分发消息Dispatcher::instance().onMessage(shared_from_this(), msg);
}
逻辑:
- 服务端
BaseConnection
接收字节流,反序列化为MethodRequest
对象。 - 调用
Dispatcher::onMessage
触发多路转接。
5.6 关键类交互图
sequenceDiagramparticipant Client as 客户端participant Requestor as Requestor (requestor.hpp)participant Dispatcher as Dispatcher (dispatcher.hpp)participant Registry as RpcRegistry (rpc_registry.hpp)participant Server as RpcServer (rpc_server.hpp)Client->>Requestor: 调用 add(1,1)Requestor->>Client: 生成 MethodRequestClient->>Server: 发送 MethodRequest (TCP)Server->>Dispatcher: 接收消息并路由Dispatcher->>Registry: 查找方法 "add"Registry->>Dispatcher: 返回函数指针Dispatcher->>Server: 调用 add(1,1)Server->>Client: 返回 MethodResponse (TCP)Client->>Client: 解析结果为 2
5.7 补充说明
- 客户端通过
RpcCaller
生成MethodRequest
,通过Requestor
发送请求。 - 服务端通过
Dispatcher
多路转接,将请求路由到RpcRegistry
中注册的 add 方法。 - 网络层通过
BaseConnection
实现 TCP 通信,Dispatcher
确保消息正确路由。 - 结果返回通过
MethodResponse
序列化回客户端,完成一次完整的 RPC 调用。
👥总结
本篇博文对 【从零实现Json-Rpc框架】- 项目实现 - 客户端注册主题及整合 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~
相关文章:
【从零实现Json-Rpc框架】- 项目实现 - 客户端注册主题整合 及 rpc流程示意
📢博客主页:https://blog.csdn.net/2301_779549673 📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! &…...
AWS 云运维管理指南
一、总体目标 高可用性:通过跨可用区 (AZ) 和跨区域 (Region) 的架构设计,确保系统运行可靠。性能优化:优化AWS资源使用,提升应用性能。安全合规:利用AWS内置安全服务,满足行业合规要求(如GDPR、ISO 27001、等保2.0)。成本管控:通过成本优化工具,减少浪费,实现FinOp…...
vector的实现:
我们之前讲了vector的接口,我们今天来看一下vector的底层的实现: 在gitee上面我们的这个已经实现好了,我们看gitee就可以:vector的实现/vector的实现/vector的实现.h 拾亿天歌/拾亿天歌 - 码云 - 开源中国 我们在这强调比较难的…...
flutter 专题 九十六 Flutter开发之常用Widgets
上一篇,我们介绍了基础Widgets,接下来,我们看一下Flutter开发中一些比较常见的Widget。 Flutter Widget采用现代响应式框架构建,这是从 React 中获得的灵感,中心思想是用widget构建你的UI。 Widget描述了他们的视图在…...
Linux环境下内存错误问题排查与修复
最近这几天服务器总是掉线,要查一下服务器的问题。可以首先查看一下计算机硬件,这是一台某鱼上拼凑的服务器: sudo lshw -shortH/W path Device Class Description system NF5270M3 (To be filled by O…...
flutter 专题 六十八 Flutter 多图片上传
使用Flutter进行应用开发时,经常会遇到选图、拍照等需求。如果要求不高,Flutter图库选择可以使用官方提供的image_picker,如果需要多选,那么可以使用multi_image_picker插件库。multi_image_picker库支持图库管理,多选…...
与总社团联合会合作啦
2025.4.2日,我社团向总社团联合会与暮光社团发起合作研究“浔川代码编辑器v2.0”。至3日,我社团收到回复: 总社团联合会: 总社团联合会已收到浔川社团官方联合会的申请,经考虑,我们同意与浔川社团官方联合…...
技巧:使用 ssh 设置隧道代理访问 github
问题 由于不可知的原因,在国内服务器不能访问 Github。但是有clone代码需求,这里介绍一种可行的方法。 解决办法 使用 ssh 设置代理,让代理服务器请求 github 解决。 第一步 ssh -fND 1080 用户名代理服务器IP这里的意思是监听 1080 端口…...
安装 TabbyAPI+Exllamav2 和 vLLM 的详细步骤
在 5090 显卡上成功安装 TabbyAPIExllamav2 和 vLLM 并非易事,经过一番摸索,我总结了以下详细步骤,希望能帮助大家少走弯路。 重要提示: 用户提供的 PyTorch 安装使用了 cu128,这并非标准 CUDA 版本。请根据你的系统实…...
Linux 进程信号
目录 信号 生活角度的信号 技术应用角度的信号 signal函数 信号概念 用kill -l命令可以察看系统定义的信号列表 信号处理常见方式概览 信号的产生 通过键盘组合键发送信号 通过系统函数向进程发信号 由软件条件产生信号 由硬件异常产生信号 信号的保存 阻塞信号 …...
【学习篇】fastapi接口定义学习
fastapi学习链接:用户指南 1. 路径参数 访问fastapi接口的默认http路径为http://127.0.0.1:8000,/items为定义的接口函数read_item的路径,/{item_id}这个用大括号括起来的参数就是路径参数,接口函数可以通过引用这个路径参数名称…...
第十二步:react
React 1、安装 1、脚手架 npm i -g create-react-app:安装react官方脚手架create-react-app 项目名称:初始化项目 2、包简介 react:react框架的核心react-dom:react视图渲染核心react-native:构建和渲染App的核心…...
MySQL简介
MySQL 是由瑞典 MySQL AB 公司开发的一款开源关系型数据库管理系统(RDBMS),现归属 Oracle 公司。以下是其核心特点及简介: 1. 基础特性 - 开源免费:遵循 GPL 协议,个人及中小型企业可免费使用,…...
AIGC时代Kubernetes企业级云原生运维实战:智能重构与深度实践指南
文章目录 一、AIGC技术栈与Kubernetes的深度融合1. 智能配置生成:从YAML到自然语言2. 动态资源优化:AI驱动的弹性伸缩 二、智能运维体系架构深度解析四维能力矩阵增强实现:关键组件升级代码示例: 三、企业级实战策略深度实践策略1…...
市场波动与交易策略优化
市场波动与交易策略优化 在交易市场中,价格波动是常态。如何有效应对市场的波动,制定合理的交易策略,成为许多交易者关注的重点。本文将探讨市场波动的影响因素,并介绍应对不同市场波动环境的策略。 一、市场波动的影响因素 市场供…...
Prolog语言的移动UI设计
Prolog语言的移动UI设计 随着移动设备的普及,用户界面的设计已成为软件开发的重要组成部分。移动UI设计不仅要注重美观,更要关注用户体验和功能的实现。使用Prolog语言进行移动UI设计,虽然相对少见,但其逻辑编程的特性为复杂的交…...
linux 命令 awk
awk 是 Linux/Unix 系统中一个强大的文本处理工具,尤其擅长处理结构化文本数据(如日志、表格数据)。它不仅是命令行工具,还是一种脚本语言,支持变量、条件、循环等编程特性 1. 基本语法 awk [选项] 模式 {动作} 文件名…...
在 PyQt 加载 UI 三种方法
目录 方法一:使用 uic 模块动态加载 (不推荐用于大型项目) 方法二:将 UI 文件编译为 Python 模块后导入 方法3:使用uic模块直接在代码中加载UI文件 注意事项 总结: 在PyQt中,加载UI文件通常…...
前端快速入门学习2-HTML
一、概述 HTML全称是Hypertext Markup Language(超文本标记语言) HTML通过一系列的 标签(也称为元素) 来定义文本、图像、链接等等。HTML标签是由尖括号包围的关键字。 标签通常成对出现,包括开始标签和结束标签(也称为双标签),内容位于这两个标签之间…...
Cortex-M系列MCU的位带操作
Cortex-M系列位带操作详解 位带(Bit-Banding)是Cortex-M3/M4等处理器提供的一种硬件特性,允许通过别名地址对内存或外设寄存器中的单个位进行原子读-改-写操作,无需禁用中断或使用互斥锁。以下是位带操作的完整指南: …...
【嵌入式-stm32电位器控制LED亮灭以及编码器控制LED亮灭】
嵌入式-stm32电位器控制LED亮暗 任务代码Key.cKey.hmain.c 实验现象 任务 本文主要介绍利用stm32f103C8T6实现电位器控制PWM的占空比大小来改变LED亮暗程度,按键实现使用定时器非阻塞式,其中一个按键切换3个LED的控制状态,另一个按键是重置当…...
抖音热点视频识别与分片处理机制解析
抖音作为日活数亿的短视频平台,其热点视频识别和分片处理机制是支撑高并发访问的核心技术。以下是抖音热点视频识别与分片的实现方案: 热点视频识别机制 1. 实时行为监控系统 用户行为聚合:监控点赞、评论、分享、完播率等指标的异常增长曲线内容特征分析:通过AI识别视频…...
添加购物车功能
业务需求: 用户提交三个字段,服务端根据提交的字段判断是菜品还是套餐,根据菜品或者套餐添加购物车表中。 代码实现 RestController Slf4j RequestMapping("/user/shoppingCart") public class ShoppingCartController {Autowired…...
蓝桥杯备赛 Day16 单调数据结构
单调栈和单调队列能够动态的维护,还需用1-2两个数组在循环时从单调栈和单调队列中记录答案 单调栈 要点 1.时刻保持内部元素具有单调性质的栈(先进后出),核心是:入栈时逐个删除所有"更差的点",一般可分为单调递减栈、单调递增栈、单调不减栈、单调不增…...
AI Agent开发大全第十九课-神经网络入门 (Tensorflow)
(前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站)。 一、从买房困惑到神经元:神经网络的灵感来源 1.1 房地产经纪人的定价难题 想象一个周末的房产中介门店,经纪人小李正面对10份不同房源的报...
Mac VM 卸载 win10 安装win7系统
卸载 找到相应直接删除(移动到废纸篓) 可参考:mac如何卸载虚拟机win 下载 win7下载地址...
torch.nn中的非线性激活使用
1、神经网络中的非线性激活 在神经网络中,**非线性激活函数(Non-linear Activation Functions)**是引入非线性变换的关键组件,使神经网络能够学习并建模复杂的非线性关系。如果没有激活函数,无论神经网络有多少层&…...
【安全】Web渗透测试(全流程)_渗透测试学习流程图
1 信息收集 1.1 域名、IP、端口 域名信息查询:信息可用于后续渗透 IP信息查询:确认域名对应IP,确认IP是否真实,确认通信是否正常 端口信息查询:NMap扫描,确认开放端口 发现:一共开放两个端口&…...
要素的选择与转出
1.要素选择的三种方式 当要在已有的数据中选择部分要素时,ArcMap提供了三种方式:按属性选择、位置选择及按图形选择。 1)按属性选择 通过设置 SQL查询表达式,用来选择与选择条件匹配的要素。 (1)单击主菜单下【选择】【按属性选择】,打开【按…...
C 语言命令行参数:让程序交互更灵活
一、引言 在 C 语言编程领域,命令行参数是一种极为实用的机制,它允许我们在执行程序时,从外部向程序传递数据。这一特性极大地增强了程序的灵活性和可控性,避免了在代码中对数据进行硬编码。比如在开发系统工具、脚本程序时&…...
部署nerdctl工具
nerdctl 是一个专为Containerd设计的容器管理命令行工具,旨在提供类似 Docker 的用户体验,同时支持 Containerd 的高级特性(如命名空间、compose等)。 1、下载安装 wget https://github.com/containerd/nerdctl/releases/downlo…...
SOA 架构
定义与概念:SOA 将应用程序的不同功能单元(称为服务)进行封装,并通过定义良好的接口和协议来实现这些服务之间的通信和交互。这些服务可以在不同的平台和编程语言中实现,彼此之间相互独立,能够以松散耦合的…...
K8s私有仓库拉取镜像报错解决:x509 certificate signed by unknown authority
前言 在Kubernetes环境中使用自签名证书的私有Harbor镜像仓库时,常会遇到证书验证失败的问题。本文将详细讲解如何解决这个常见的证书问题。 环境信息: Kubernetes版本:1.28.2容器运行时:containerd 1.6.20私有仓库:…...
在线考试系统带万字文档java项目java课程设计java毕业设计springboot项目
文章目录 在线考试系统一、项目演示二、项目介绍三、万字项目文档四、部分功能截图五、部分代码展示六、底部获取项目源码带万字文档(9.9¥带走) 在线考试系统 一、项目演示 在线考试系统 二、项目介绍 1、管理员角色: 考试管理&…...
Axure RP 9 详细图文安装流程(附安装包)教程包含下载、安装、汉化、授权
文章目录 前言一、Axure RP 9介绍二、Axure RP 9 安装流程1. Axure RP 9 下载2. 启动安装程序3. 安装向导操作4.完成安装 三、Axure RP 9 汉化四、Axure RP 9授权 前言 本基础安装流程教程,将以清晰、详尽且易于遵循的步骤介绍Axure RP 9 详细图文安装流程…...
动态规划练习题④
583. 两个字符串的删除操作 给定两个单词 word1 和 word2 ,返回使得 word1 和 word2 相同所需的最小步数。 每步 可以删除任意一个字符串中的一个字符。 示例 1: 输入: word1 "sea", word2 "eat" 输出: 2 解释: 第一步将 &quo…...
多输入多输出 | Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测
多输入多输出 | Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测 目录 多输入多输出 | Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现BO-GRU贝叶斯优化门控循环单元多输入多输出预测&#…...
爬虫工程师的社会现状
现在网上你只要搜索教程就是韦世东;k哥爬虫教你爬虫方面的逆向知识;然后看着这些逆向js百例;搞得我很尴尬我做了这么多年的爬虫工程师;现在算什么;这些逆向的东西我并没有很深层次的了解;但是工作的内容也依旧解决了;并没有到爬虫工程师非要会那么多逆向才能算的上是合格的爬虫…...
Flink 1.20 Kafka Connector:新旧 API 深度解析与迁移指南
Flink Kafka Connector 新旧 API 深度解析与迁移指南 一、Flink Kafka Connector 演进背景 Apache Flink 作为实时计算领域的标杆框架,其 Kafka 连接器的迭代始终围绕性能优化、语义增强和API 统一展开。Flink 1.20 版本将彻底弃用基于 FlinkKafkaConsumer/FlinkK…...
Vue2 父子组件数据传递与调用:从 ref 到 $emit
提示:https://github.com/jeecgboot/jeecgboot-vue2 文章目录 案例父组件向子组件传递数据的方式父组件调用子组件方法的方式子组件向父组件传递数据的方式流程示意图 案例 提示:以下是本篇文章正文内容,下面案例可供参考 以下是 整合后的关…...
【matplotlib参数调整】
1. 基本绘图函数常用参数 折线图 import matplotlib.pyplot as plt import numpy as npx np.linspace(0, 10, 100) y np.sin(x)plt.plot(x, y, colorred, linestyle--, linewidth2,markero, markersize5, labelsin(x), alpha0.8) plt.title(折线图示例) plt.xlabel(X 轴) p…...
如何使用 IntelliJ IDEA 开发命令行程序(或 Swing 程序)并手动管理依赖(不使用 pom.xml)
以下是详细步骤: 1. 创建项目 1.1 打开 IntelliJ IDEA。 1.2 在启动界面,点击 Create New Project(创建新项目)。 1.3 选择 Java,然后点击 Next。 1.4 确保 Project SDK 选择了正确的 JDK 版本&#x…...
Linux红帽:RHCSA认证知识讲解(十 一)配置NTP 时间同步、用户密码策略与使用 autofs 实现 NFS 自动挂载
Linux红帽:RHCSA认证知识讲解(十 一)配置NTP 时间同步、用户密码策略与 NFS 自动挂载 前言一、配置 NTP 时间同步1.1 NTP 简介1.2 安装和配置 NTP 客户端 二、配置新建用户密码过期时间2.1 查看用户密码过期时间2.2 修改密码过期时间 三、使用…...
ffmpeg音视频处理流程
文章目录 FFmpeg 音视频处理流程详细讲解总结音视频处理流程相关的 FFmpeg 工具和命令 FFmpeg 的音视频处理流程涵盖了从输入文件读取数据、编码和解码操作、数据处理、以及最终输出数据的完整过程。为了更好地理解这一流程,我们可以从以下几个关键步骤来分析&#…...
leetcode-代码随想录-链表-移除链表元素
题目 链接:203. 移除链表元素 - 力扣(LeetCode) 给你一个链表的头节点 head 和一个整数 val ,请你删除链表中所有满足 Node.val val 的节点,并返回 新的头节点 。 输入:head [1,2,6,3,4,5,6], val 6 …...
c++与rust的语言区别,rust的主要难点,并举一些例子
C 和 Rust 都是系统级编程语言,它们在设计目标、语法、内存管理等方面存在诸多区别,以下为你详细介绍: 设计目标 C:C 最初是为了给 C 语言添加面向对象编程特性而设计的,之后不断发展,旨在提供高性能、灵…...
从基础算力协作到超智融合,超算互联网助力大语言模型研习
一、背景 大语言模型(LLMs)的快速发展释放出了AI应用领域的巨大潜力。同时,大语言模型作为 AI领域的新兴且关键的技术进展,为 AI 带来了全新的发展方向和应用场景,给 AI 注入了新潜力,这体现在大语言模型独…...
【spring cloud Netflix】Eureka注册中心
1.概念 Eureka就好比是滴滴,负责管理、记录服务提供者的信息。服务调用者无需自己寻找服务,而是把自己的 需求告诉Eureka,然后Eureka会把符合你需求的服务告诉你。同时,服务提供方与Eureka之间通过“心跳” 机制进行监控…...
记录学习的第二十天
今天只做了一道题,有点不在状态。 这道题其实跟昨天的每日一题是差不多的,不过这道题需要进行优化。 根据i小于j,且j小于k,当nums[j]确定时,保证另外两个最大即可得答案。 所以可以使用前缀最大值和后缀最大值。 代…...
7-5 表格输出
作者 乔林 单位 清华大学 本题要求编写程序,按照规定格式输出表格。 输入格式: 本题目没有输入。 输出格式: 要求严格按照给出的格式输出下列表格: ------------------------------------ Province Area(km2) Pop.(…...