AimRT从入门到精通 - 04RPC客户端和服务器
一、ROS中的service通信机制
服务通信也是ROS中一种极其常用的通信模式,服务通信是基于请求响应模式的,是一种应答机制。也即:一个节点A向另一个节点B发送请求,B接收处理请求并产生响应结果返回给A。比如如下场景:
机器人巡逻过程中,控制系统分析传感器数据发现可疑物体或人... 此时需要拍摄照片并留存。
在上述场景中,就使用到了服务通信。
- 数据分析节点A需要向相机相关节点B发送图片存储请求,节点B处理请求,并返回处理结果。
与上述应用类似的,服务通信更适用于对实时性有要求、具有一定逻辑处理的应用场景。
概念
服务通信是以请求响应的方式实现不同节点之间数据传输的通信模式。发送请求数据的对象称为客户端,接收请求并发送响应的对象称之为服务端,同话题通信一样,客户端和服务端也通过话题相关联,不同的是服务通信的数据传输是双向交互式的。
服务通信中,服务端与客户端是一对多的关系,也即,同一服务话题下,存在多个客户端,每个客户端都可以向服务端发送请求。
二、 AimRT 中的RPC通讯机制
RPC也叫远程过程调用,基于请求-回复模型,由客户端Client和服务端Server组成,Module可以创建客户端句柄,发起特定的 RPC 请求,由其指定的、或由框架根据一定规则指定的服务端来接收请求并回复。其与ROS中的service通信中的服务器和客户端类似;
与Channel类似,其在接口层和后端层也实现了解耦,分为两部分;
在接口层主要使用Protocol支持的协议进行通信,例如protobuf / ROS2msg/srv;
在后端支持的协议有http、ros2,开发者可以自己在配置文件中设置;
关于接口层使用的protocol,详细可以看上一篇的介绍;
这里就不再过多介绍;
三、 基于Protobuf消息的客户端和服务器
这里关于protobuf的相关简单使用,大家可以参考我之前写的博客:
Protubuf入门 --- 01基本语法与编译使用-CSDN博客
接下来我们演示一个简单的案例:
编写.proto文件
common.proto
syntax = "proto3";package aimrt.protocols.example;message ExampleFoo {int32 code = 1;string data = 2;
}message ExampleBar {int32 num = 1;
}
rpc.proto
syntax = "proto3";import "common.proto";package aimrt.protocols.example;message GetFooDataReq {string msg = 1;
}message GetFooDataRsp {uint64 code = 1;string msg = 2;aimrt.protocols.example.ExampleFoo data = 3;
}service ExampleService {rpc GetFooData(GetFooDataReq) returns (GetFooDataRsp);
}
其中:
- rpc.proto:包含了公共的common.proto,所以这里我们improt;
- package:这里会在对应的C++代码中,变成命名空间,如:aimrt::protocols::example;
- GetFooDataReq:这里定义了一个消息的请求结构(客户端发送的数据类型);
- GetFooDataRsp:这里定义了一个消息的回复的结构(服务器回复的数据类型);
- 接下来定义了一个服务方法:这里通过rpc架构,接收GetFooDataReq,回复GetFooDataRsp
.proto文件的编译(通信通信的框架下)
对于正常情况下.proto文件的编译,大家可以参考我的之前写的博客;
这里AimRT对于protobuf的编译提供了下面的编译指令,即我们可以在CMakeLists.txt进行相关文件的配置,需要注意的是,在实际使用时,需要先将其生成对应的C++代码,然后再生成C++对应的服务代码:
add_ros2_aimrt_rpc_gencode_target_for_one_file:为单个 srv 文件生成 RPC 服务 C++ 代码,参数如下:
-
TARGET_NAME:生成的 CMake Target 名称;
-
PACKAGE_NAME:ROS2 协议 PKG 的名称;
-
PROTO_FILE:协议文件的路径;
-
GENCODE_PATH:生成的桩代码存放路径;
-
DEP_PROTO_TARGETS:依赖的协议 CMake Target;
-
OPTIONS:传递给工具的其他参数;
例如下面所示的代码例子:
# 添加对 pb 消息的代码生成
add_protobuf_gencode_target_for_proto_path(TARGET_NAME ${PROTO_NAME}_pb_gencodePROTO_PATH ${CMAKE_CURRENT_SOURCE_DIR}GENCODE_PATH ${CMAKE_CURRENT_BINARY_DIR})# 添加对 RPC pb的代码生成
add_protobuf_aimrt_rpc_gencode_target_for_proto_files(TARGET_NAME ${PROTO_NAME}_aimrt_rpc_gencodePROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/my_rpc.protoGENCODE_PATH ${CMAKE_CURRENT_BINARY_DIR}DEP_PROTO_TARGETS ${PROTO_NAME}_pb_gencode
)
- 我们使用前面用到的add_protobuf_gencode_target_for_proto_path宏,将当前目录中的所有普通的 proto 消息进行代码生成,生成的目标名为:${PROTO_NAME}_pb_gencode;
- 接着我们使用add_protobuf_aimrt_rpc_gencode_target_for_proto_files宏,将指定的 RPC proto消息进行生成,生成的target为${PROTO_NAME}_aimrt_rpc_gencode;
- 在PROTO_FILES参数指明当前包含rpc的proto的文件名;
- GENCODE_PATH指明当前生成代码的路径,其中 AimRT 会自动将这个路径添加到库的头文件搜索路径;
- 因为我们在RPC PB 中使用到了common.proto,所以需要给其添加普通pb消息的依DEP_PROTO_TARGETS ${PROTO_NAME}_pb_gencode;
服务器模块
其中,服务器module包含下面三个模块,这里我们依次对其进行讲解:
logger.h/logger.cc
#pragma once#include "aimrt_module_cpp_interface/logger/logger.h"namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module {void SetLogger(aimrt::logger::LoggerRef);
aimrt::logger::LoggerRef GetLogger();}
这里logger.h主要为服务器提供日志的设置,一共提供了两个函数:
- 通过SetLogger: 这里我们传入对应的日志句柄,可以将其内部的管理的模块的日志句柄进行赋值;
- 通过GetLogger:这里我们可以获取到我们之前设置的日志句柄;
service.h
#pragma once#include "rpc.aimrt_rpc.pb.h"namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module {class ExampleServiceSyncServiceImpl : public aimrt::protocols::example::ExampleServiceSyncService {public:ExampleServiceSyncServiceImpl() = default;~ExampleServiceSyncServiceImpl() override = default;aimrt::rpc::Status GetFooData(aimrt::rpc::ContextRef ctx,const ::aimrt::protocols::example::GetFooDataReq& req,::aimrt::protocols::example::GetFooDataRsp& rsp) override;}
这里我们需要注意的是:
AimRT 将每个 Service 的桩代码封装为一个基类,在程序中,我们只需要继承该基类,并重写基类中的 RPC 方法即可。例如在 pb 中,定义的 RPC 服务名为MyService
,因此 AimRT 为我们生成以下桩代码类:
- 异步服务端桩代码类名为: [服务名]CoServic;
- 同步服务端桩代码类名为:[服务名]SyncService;
例如上面的,这里在看我们定义的.proto文件:
这里我们声明了下面的命名空间:
package aimrt.protocols.example;
所以生成的C++源文件中会生成对应的命名空间:aimrt::protocols::example这个命名空间!
除此之外,主要有的是我们定义了下面这个service:
service ExampleService {rpc GetFooData(GetFooDataReq) returns (GetFooDataRsp);
所以对应生成的服务会在 aimrt::protocols::example:ExampleService;
而AimRT会基于aimrt::protocols::example:ExampleService开发对应的异步/同步接口!
而这个异步/同步接口都是继承ServiceBase的!
用户如果要使用异步/同步的服务,只需要对其进行继承重写即可!
如上面所示,这里我们可以看到对其是基于继承基类模块的;
class ServiceBase {public:std::string_view RpcType() const;void SetServiceName(std::string_view service_name);std::string_view ServiceName() const;// ...
};class XXXService : public aimrt::rpc::ServiceBase {// ...
}
详细的大家可以参考下面的文档:
Rpc — AimRT v0.10.0 documentation
因此再总结一下同步型服务接口的使用:
- 引用桩代码头文件,例如
xxx.aimrt_rpc.pb.h
或者xxx.aimrt_rpc.srv.h
,其中有同步接口的 Service 基类XXXSyncService;
- 开发者实现一个 Impl 类,继承
XXXSyncService
,并实现其中的虚接口; - 解析 Req,并填充 Rsp;
- 返回
Status
; - 在
Initialize
阶段调用RpcHandleRef
的RegisterService
方法注册 RPC Service;
这是一个固定的模板!大家可以参考;
service.cc
#include "normal_rpc_sync_server_module/service.h"
#include "aimrt_module_protobuf_interface/util/protobuf_tools.h"
#include "normal_rpc_sync_server_module/global.h"namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module {aimrt::rpc::Status ExampleServiceSyncServiceImpl::GetFooData(aimrt::rpc::ContextRef ctx,const ::aimrt::protocols::example::GetFooDataReq& req,::aimrt::protocols::example::GetFooDataRsp& rsp) {rsp.set_msg("echo " + req.msg());AIMRT_INFO("Server handle new rpc call. context: {}, req: {}, return rsp: {}",ctx.ToString(), aimrt::Pb2CompactJson(req), aimrt::Pb2CompactJson(rsp));return aimrt::rpc::Status();
}}
实际上这里定义的这个函数就是服务端用来处理数据的回调函数!
这个函数包含3个参数,其中:
- ctx:表示向RPC的后端传递一些特定的配置信息;
- req:表示发送的proto类型的数据;
- rsp:表示回复相应的数据;
这里函数体内实际上就是对回复数据做处理:前面+上打印"echo";
返回值是rpc对应的状态(如果这里我们设置的有错误码,这里返回可以获取到对应的错误码,从而查询获取到对应的错误类型;)
service_module.h
#pragma once#include <memory>#include "aimrt_module_cpp_interface/module_base.h"
#include "normal_rpc_sync_server_module/service.h"namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module {class NormalRpcSyncServerModule : public aimrt::ModuleBase {public:NormalRpcSyncServerModule() = default;~NormalRpcSyncServerModule() override = default;ModuleInfo Info() const override {return ModuleInfo{.name = "NormalRpcSyncServerModule"};}bool Initialize(aimrt::CoreRef core) override;bool Start() override;void Shutdown() override;private:aimrt::CoreRef core_;std::shared_ptr<ExampleServiceSyncServiceImpl> service_ptr_;std::string service_name_;
};}
整体框架和之前的一致,这里采用共享指针的方式声明一个服务端;
service_module.cc
这里我们只对主模块进行讲解:
#include "normal_rpc_sync_server_module/normal_rpc_sync_server_module.h"
#include "normal_rpc_sync_server_module/global.h"#include "yaml-cpp/yaml.h"namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_server_module {bool NormalRpcSyncServerModule::Initialize(aimrt::CoreRef core) {core_ = core;SetLogger(core_.GetLogger());try {// Read cfgstd::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());if (!file_path.empty()) {YAML::Node cfg_node = YAML::LoadFile(file_path);if (cfg_node["service_name"]) {service_name_ = cfg_node["service_name"].as<std::string>();}}// Create serviceservice_ptr_ = std::make_shared<ExampleServiceSyncServiceImpl>();// Register servicebool ret = false;if (service_name_.empty()) {ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());} else {ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get());}AIMRT_CHECK_ERROR_THROW(ret, "Register service failed.");AIMRT_INFO("Register service succeeded.");} catch (const std::exception& e) {AIMRT_ERROR("Init failed, {}", e.what());return false;}AIMRT_INFO("Init succeeded.");return true;
}bool NormalRpcSyncServerModule::Start() { return true; }void NormalRpcSyncServerModule::Shutdown() {}}
上面的代码以同步通信为例子:
在初始化部分:
- 首先通过SetLogger设置全局的日志;
- 然后读取配置文件;
- 通过共享指针管理服务器模块;
- 接下来注册服务器的接口;
// Register servicebool ret = false;if (service_name_.empty()) {ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());} else {ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get());}
这里注册服务器的逻辑是:
- 如果我们没有服务器的名字,那么此时就按照默认的名称进行注册;
- 如果我们设置了对应的名字,此时就按照自定义的名字进行注册;
接下来分别开启开始模块和结束模块;
客户端模块
client_module.h
#pragma once#include <atomic>
#include <future>
#include <memory>#include "aimrt_module_cpp_interface/module_base.h"#include "rpc.aimrt_rpc.pb.h"namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_client_module {class NormalRpcSyncClientModule : public aimrt::ModuleBase {public:NormalRpcSyncClientModule() = default;~NormalRpcSyncClientModule() override = default;ModuleInfo Info() const override {return ModuleInfo{.name = "NormalRpcSyncClientModule"};}bool Initialize(aimrt::CoreRef core) override;bool Start() override;void Shutdown() override;private:auto GetLogger() { return core_.GetLogger(); }void MainLoop();private:aimrt::CoreRef core_;aimrt::executor::ExecutorRef executor_;std::atomic_bool run_flag_ = false;std::promise<void> stop_sig_;double rpc_frq_ = 1.0;std::string service_name_;std::shared_ptr<aimrt::protocols::example::ExampleServiceSyncProxy> proxy_;
};}
这里的客户端模块的框架实际上与发布者publisher的框架一样;
其实这里的service的主要原因是channel与rpc的通信机制不一样,因为channel收到消息后不用再返回给客户端,而service这里收到消息后需要对消息做处理,然后再返回给client,所以就会麻烦很多,因此发送消息后客户端是异步还是同步等?考虑的问题更多,就显得更加复杂;
接下来我们依次对上面的代码模块进行分析:
- 主要的模块(初始化、开始和结束)与我们之前进行的模块一样;
- 除此之外,还有一个executor,这里是用于发送者将任务投递到执行器,然后执行器可以发送对应的任务;
- run_flag:用来标识模块是否在运行;
- stop_sig_n:当发送任务循环结束时,此时会给发送对应的信号给关闭模块,shutdown收到后此时就对模块进行关闭;
- 除此之外,这里还定义了话题名、发布者和发布的频率;
client_module.cc
#include "normal_rpc_sync_client_module/normal_rpc_sync_client_module.h"
#include "aimrt_module_protobuf_interface/util/protobuf_tools.h"#include "yaml-cpp/yaml.h"namespace aimrt::examples::cpp::pb_rpc::normal_rpc_sync_client_module {bool NormalRpcSyncClientModule::Initialize(aimrt::CoreRef core) {core_ = core;try {// Read cfgstd::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());if (!file_path.empty()) {YAML::Node cfg_node = YAML::LoadFile(file_path);rpc_frq_ = cfg_node["rpc_frq"].as<double>();if (cfg_node["service_name"]) {service_name_ = cfg_node["service_name"].as<std::string>();}}// Get executor handleexecutor_ = core_.GetExecutorManager().GetExecutor("work_thread_pool");AIMRT_CHECK_ERROR_THROW(executor_, "Get executor 'work_thread_pool' failed.");// Get rpc handleauto rpc_handle = core_.GetRpcHandle();AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");// Register rpc clientbool ret = false;if (service_name_.empty()) {ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);} else {ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);}AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");// Create rpc proxyproxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceSyncProxy>(rpc_handle);if (!service_name_.empty()) {proxy_->SetServiceName(service_name_);}} catch (const std::exception& e) {AIMRT_ERROR("Init failed, {}", e.what());return false;}AIMRT_INFO("Init succeeded.");return true;
}bool NormalRpcSyncClientModule::Start() {try {run_flag_ = true;executor_.Execute(std::bind(&NormalRpcSyncClientModule::MainLoop, this));} catch (const std::exception& e) {AIMRT_ERROR("Start failed, {}", e.what());return false;}AIMRT_INFO("Start succeeded.");return true;
}void NormalRpcSyncClientModule::Shutdown() {try {if (run_flag_) {run_flag_ = false;stop_sig_.get_future().wait();}} catch (const std::exception& e) {AIMRT_ERROR("Shutdown failed, {}", e.what());return;}AIMRT_INFO("Shutdown succeeded.");
}// Main loop
void NormalRpcSyncClientModule::MainLoop() {try {AIMRT_INFO("Start MainLoop.");uint32_t count = 0;while (run_flag_) {// Sleepstd::this_thread::sleep_for(std::chrono::milliseconds(static_cast<uint32_t>(1000 / rpc_frq_)));count++;AIMRT_INFO("Loop count : {} -------------------------", count);// Create req and rspaimrt::protocols::example::GetFooDataReq req;aimrt::protocols::example::GetFooDataRsp rsp;req.set_msg("hello world foo, count " + std::to_string(count));// Create ctxauto ctx_ptr = proxy_->NewContextSharedPtr();ctx_ptr->SetTimeout(std::chrono::seconds(3));AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(req));// Call rpcauto status = proxy_->GetFooData(ctx_ptr, req, rsp);// Check resultif (status.OK()) {AIMRT_INFO("Client get rpc ret, status: {}, rsp: {}", status.ToString(),aimrt::Pb2CompactJson(rsp));} else {AIMRT_WARN("Client get rpc error ret, status: {}", status.ToString());}}AIMRT_INFO("Exit MainLoop.");} catch (const std::exception& e) {AIMRT_ERROR("Exit MainLoop with exception, {}", e.what());}stop_sig_.set_value();
}}
接下来我们对上面的模块进行挨个分析:
#include "normal_rpc_sync_client_module/normal_rpc_sync_client_module.h"
#include "aimrt_module_protobuf_interface/util/protobuf_tools.h"#include "yaml-cpp/yaml.h"
而当我们需要将二进制的protobuf文件转化为其他可读类型的数据时,此时就需要包含下面这个头文件:
#include "aimrt_module_protobuf_interface/util/protobuf_tools.h"
其他的头文件在 #include"normal_rpc_sync_client_module/normal_rpc_sync_client_module.h"包含的都有;
接下来我们再看初始化模块:
bool NormalRpcSyncClientModule::Initialize(aimrt::CoreRef core) {core_ = core;try {// Read cfgstd::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());if (!file_path.empty()) {YAML::Node cfg_node = YAML::LoadFile(file_path);rpc_frq_ = cfg_node["rpc_frq"].as<double>();if (cfg_node["service_name"]) {service_name_ = cfg_node["service_name"].as<std::string>();}}// Get executor handleexecutor_ = core_.GetExecutorManager().GetExecutor("work_thread_pool");AIMRT_CHECK_ERROR_THROW(executor_, "Get executor 'work_thread_pool' failed.");// Get rpc handleauto rpc_handle = core_.GetRpcHandle();AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");// Register rpc clientbool ret = false;if (service_name_.empty()) {ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);} else {ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);}AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");// Create rpc proxyproxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceSyncProxy>(rpc_handle);if (!service_name_.empty()) {proxy_->SetServiceName(service_name_);}} catch (const std::exception& e) {AIMRT_ERROR("Init failed, {}", e.what());return false;}AIMRT_INFO("Init succeeded.");return true;
}
在初始化模块中,依次进行:
- 读取配置文件中的内容,且如果配置文件中设置了话题名,我们就提取里面的话题名;
- 获取我们对应的执行器的句柄;
- 获得rpc通信的句柄;
- 注册rcp客户端;
// Register rpc clientbool ret = false;if (service_name_.empty()) {ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);} else {ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);}
这里注册的逻辑依然是:如果我们没有设置服务话题名,就按照默认的名字进行注册;否则按照我们自定义的名字进行注册;
除此之外,这里最主要的是创建了一个rpc的proxy;
问题:客户端当中我们如何传递信息给服务端?
在AimRT中,提供了四种类型的接口:
- 同步型接口:名称一般为XXXSyncProxy;
- 异步回调型接口:名称一般为XXXAsyncProxy;
- 异步 Future 型接口:名称一般为XXXFutureProxy;
- 无栈协程型接口:名称一般为XXXCoProxy;
其中,在头文件中,我们已经定义了一个共享指针来管理同步型的proxy接口:
std::shared_ptr<aimrt::protocols::example::ExampleServiceSyncProxy> proxy_;
这个接口会继承基类的一些共享接口:
class ProxyBase {public:std::string_view RpcType() const;void SetServiceName(std::string_view service_name);std::string_view ServiceName() const;std::shared_ptr<Context> NewContextSharedPtr(ContextRef ctx_ref = ContextRef()) const;void SetDefaultContextSharedPtr(const std::shared_ptr<Context>& ctx_ptr);std::shared_ptr<Context> GetDefaultContextSharedPtr() const;
};class XXXProxy : public aimrt::rpc::CoProxyBase {public:explicit XXXProxy(aimrt::rpc::RpcHandleRef rpc_handle_ref);static bool RegisterClientFunc(aimrt::rpc::RpcHandleRef rpc_handle_ref);static bool RegisterClientFunc(aimrt::rpc::RpcHandleRef rpc_handle_ref, std::string_view service_name);// ...
}
详细的讲解大姐可以参考下面的链接:
Rpc — AimRT v0.10.0 documentation
接下来再回到我们的初始化模块关于proxy的部分:
- 这里是相当于通过传入我们的rpc句柄,将proxy进行实例化;
- 然后如果我们这里定义了话题名就设置话题名,相当于进行了一些预备操作;
开始start模块:
bool NormalRpcSyncClientModule::Start() {try {run_flag_ = true;executor_.Execute(std::bind(&NormalRpcSyncClientModule::MainLoop, this));} catch (const std::exception& e) {AIMRT_ERROR("Start failed, {}", e.what());return false;}AIMRT_INFO("Start succeeded.");return true;
}
开始模块的逻辑很简单:
- 这里还是将我们对应的运行状态改为true;
- 然后让执行器执行对应的MainLoop操作;(在MainLoop里面会进行一些数据的处理);
发送消息MainLoop模块:
// Main loop
void NormalRpcSyncClientModule::MainLoop() {try {AIMRT_INFO("Start MainLoop.");uint32_t count = 0;while (run_flag_) {// Sleepstd::this_thread::sleep_for(std::chrono::milliseconds(static_cast<uint32_t>(1000 / rpc_frq_)));count++;AIMRT_INFO("Loop count : {} -------------------------", count);// Create req and rspaimrt::protocols::example::GetFooDataReq req;aimrt::protocols::example::GetFooDataRsp rsp;req.set_msg("hello world foo, count " + std::to_string(count));// Create ctxauto ctx_ptr = proxy_->NewContextSharedPtr();ctx_ptr->SetTimeout(std::chrono::seconds(3));AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(req));// Call rpcauto status = proxy_->GetFooData(ctx_ptr, req, rsp);// Check resultif (status.OK()) {AIMRT_INFO("Client get rpc ret, status: {}, rsp: {}", status.ToString(),aimrt::Pb2CompactJson(rsp));} else {AIMRT_WARN("Client get rpc error ret, status: {}", status.ToString());}}AIMRT_INFO("Exit MainLoop.");} catch (const std::exception& e) {AIMRT_ERROR("Exit MainLoop with exception, {}", e.what());}stop_sig_.set_value();
}
接下来我们挨个分析上面的代码逻辑结构:
- 首先,如果run_flag为true,说明代码module此时框架是start,所以我们需要发送任务;
- 接下来,设置任务的执行频率;
- 分别创建rep和rsp用来储存:发送的消息和接受的消息;
- 接下来我们设置消息内容:
req.set_msg("hello world foo, count " + std::to_string(count));
- 接下来我们创建对应的后端配置:
auto ctx_ptr = proxy_->NewContextSharedPtr();ctx_ptr->SetTimeout(std::chrono::seconds(3));AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(req));
- 在proxy的基类中,提供了管理context后端配置的共享指针;
- 设置 RPC 调用的超时时间,如果超市超过3s,此时框架会在超时后自动取消等待响应;
接下来这里我们就可以通过proxy调用对应的回调函数进行信息的发送:
// Call rpcauto status = proxy_->GetFooData(ctx_ptr, req, rsp);
- 如果返回的start.OK()为true,说明此时获取rpc的过程中没有出现错误!所以接下来打印正常info信息;
- 否则就打印warm警告信息;
- 服务器回复客户端的消息会保存到req当中;
stop_sig_.set_value();
最后,当循环结束后,此时说明不会再进行消息的发送,因此向shutdown发送对应的信号;
结束shutdown模块:
void NormalRpcSyncClientModule::Shutdown() {try {if (run_flag_) {run_flag_ = false;stop_sig_.get_future().wait();}} catch (const std::exception& e) {AIMRT_ERROR("Shutdown failed, {}", e.what());return;}AIMRT_INFO("Shutdown succeeded.");
}
当信号发送后,此时shutdown会收到对应的信号,然后成功关闭!
CMake链接相关库
在上面我们已经进行了.proto文件的相关编译,因此,此时如果我们想要在自己的源文件中进行链接,很简单,例如下面的例子(对应上面我们编译的.proto文件):
target_link_libraries(my_lib PUBLIC example_rpc_aimrt_rpc_gencode)
此时即可成功链接对应的库;
四、 基于ROS2 srv消息的客户端和服务器
编写.msg文件
byte[] data
---
int64 code
其中,以---来分割 Req 和 Rsp 的定义。然后直接通过 ROS2 提供的 CMake 方法rosidl_generate_interfaces,为 Req 和 Rsp 消息生成 C++ 代码和 CMake Target,例如:
rosidl_generate_interfaces(example_srv_gencode"srv/example.srv"
)
之后就可以引用相关的 CMake Target 来使用生成的 Req 和 Rsp 的消息结构 C++ 代码;
但是实际上,经过上面的操作,我们对于生成的消息还是不能直接进行调用;
在生成了 Req 和 Rsp 消息结构的 C++ 代码后,我们还需要使用 AimRT 提供的 Python 脚本工具,生成服务定义部分的 C++ 桩代码,例如:
python3 ARGS ./ros2_py_gen_aimrt_cpp_rpc.py --pkg_name=example_pkg --srv_file=./example.srv --output_path=./
此时即会生成对应的example.aimrt_rpc.srv.h和example.aimrt_rpc.srv.cc文件;
但是实际上,不需要这么麻烦的自己进行编译,AimRT内部也给我们提供了内置的CMake编译指令:
-
add_ros2_aimrt_rpc_gencode_target_for_one_file:为单个 srv 文件生成 RPC 服务 C++ 代码,参数如下:
-
TARGET_NAME:生成的 CMake Target 名称;
-
PACKAGE_NAME:ROS2 协议 PKG 的名称;
-
PROTO_FILE:协议文件的路径;
-
GENCODE_PATH:生成的桩代码存放路径;
-
DEP_PROTO_TARGETS:依赖的协议 CMake Target;
-
OPTIONS:传递给工具的其他参数;
-
例如下面这个例子:
# Generate C++ code for Req and Rsp message in `.srv` file
rosidl_generate_interfaces(example_srv_gencode"srv/example.srv"
)# Generate RPC service C++ code for the example '.srv' file. It is necessary to rely on the CMake Target related to ROS2 messages, which is defined in '${ROS2_EXAMPLE_CMAKE_TARGETS}'
add_ros2_aimrt_rpc_gencode_target_for_one_file(TARGET_NAME example_ros2_rpc_aimrt_rpc_gencodePACKAGE_NAME example_pkgPROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/srv/example.srvGENCODE_PATH ${CMAKE_CURRENT_BINARY_DIR}DEP_PROTO_TARGETSrclcpp::rclcpp${ROS2_EXAMPLE_CMAKE_TARGETS})
- rosidl_generate_interfaces:负责将消息结构体部分生成对应的C++代码;
- add_ros2_aimrt_rpc_gencode_target_for_one_file:再将对应的C++代码生成C++的服务代码;
- 二者缺一不可;
其他的关于服务器和客户端的时候,与protobuf完全相同,不同的是将对应的消息类型改为ROS即可,因此这里我们不再进行过多的解释;
截止到这里我们对AimRT的框架已经明白了不少,接下来有机会会发布关于AImRT中的rpc后端的相关介绍;
相关文章:
AimRT从入门到精通 - 04RPC客户端和服务器
一、ROS中的service通信机制 服务通信也是ROS中一种极其常用的通信模式,服务通信是基于请求响应模式的,是一种应答机制。也即:一个节点A向另一个节点B发送请求,B接收处理请求并产生响应结果返回给A。比如如下场景: 机器…...
【Android】Intent
目录 一、什么是Intent 二、显式Intent 三、隐式Intent 四、复杂数据传递 五、跨应用权限管理 六、常见问题 一、什么是Intent 1. 跨组件通信桥梁 实现组件间通信(Activity/Service/BroadcastReceiver)封装操作指令与数据传输逻辑 目标组件启动…...
从0开始建立Github个人博客(hugoPaperMod)
从0开始建立Github个人博客(hugo&PaperMod) github提供给每个用户一个网址,用户可以建立自己的静态网站。 一、Hugo hugo是一个快速搭建网站的工具,由go语言编写。 1.安装hugo 到hugo的github标签页Tags gohugoio/hugo选择一个版本,…...
Python集合全解析:从基础到高阶应用实战
一、集合核心特性与创建方法 1.1 集合的本质特征 Python集合(Set)是一种无序且元素唯一的容器类型,基于哈希表实现,具有以下核心特性: 唯一性:自动过滤重复元素无序性ÿ…...
Matlab自学笔记
一、我下载的是Matlab R2016a软件,打开界面如下: 二、如何调整字体大小,路径为:“主页”->“预设”->“字体”。 三、命令行窗口是直接进行交互式的,如下输入“3 5”,回车,就得到结果“…...
Python爬虫实战:获取好大夫在线各专业全国医院排行榜数据并分析,为患者就医做参考
一、引言 在当今医疗资源丰富但分布不均的背景下,患者在选择合适的心血管内科医院时面临诸多困难。好大夫在线提供的医院排行榜数据包含了医院排名、线上服务得分、患者评价得分等重要信息,对患者选择医院具有重要的参考价值。本研究通过爬取该排行榜数据,并进行深入分析,…...
多模态人工智能研究:视觉语言模型的过去、现在与未来
多模态人工智能研究:视觉语言模型的过去、现在与未来 1. 引言:定义多模态图景 多模态人工智能指的是旨在处理和整合来自多种数据类型或“模态”信息的人工智能系统,这些模态包括文本、图像、音频和视频等。与通常侧重于单一模态(…...
DeepSeek+Excel:解锁办公效率新高度
目录 一、引言:Excel 遇上 DeepSeek二、认识 DeepSeek:大模型中的得力助手2.1 DeepSeek 的技术架构与原理2.2 DeepSeek 在办公场景中的独特优势 三、DeepSeek 与 Excel 结合的准备工作3.1 获取 DeepSeek API Key3.2 配置 Excel 环境 四、DeepSeekExcel 实…...
3033. 修改矩阵
题目来源: leetcode题目:3033. 修改矩阵 - 力扣(LeetCode) 解题思路: 获取每列的最大值后将-1替换即可。 解题代码: #python3 class Solution:def getMaxRow(matrix:List[List[int]])->List[int]:r…...
Android面试总结之jet pack模块化组件篇
一、ViewModel 深入问题 1. ViewModel 如何实现跨 Fragment 共享数据?其作用域是基于 Activity 还是 Fragment? 问题解析: ViewModel 的作用域由 ViewModelStoreOwner 决定。当 Activity 和其内部 Fragment 共享同一个 ViewModelStoreOwner…...
【无需docker】mac本地部署dify
环境安装准备 #安装 postgresql13 brew install postgresql13 #使用zsh的在全局添加postgresql命令集 echo export PATH"/usr/local/opt/postgresql13/bin:$PATH" >> ~/.zshrc # 使得zsh的配置修改生效 source ~/.zshrc # 启动postgresql brew services star…...
清洗数据集
将label在图片上画出来 按照第一行的属性分类 import os import cv2 import multiprocessing as mp from tqdm import tqdm# ---------- 路径配置 ---------- # IMAGE_DIR = r"C:\Users\31919\Desktop\datasets\13k_100drive_raw_with_hand\images\test" LABEL_DIR =…...
支持向量机(SVM)详解
引言 支持向量机(Support Vector Machine, SVM)是一种强大的监督学习算法,主要用于分类和回归任务。其核心思想是找到一个最优的决策边界(超平面),最大化不同类别之间的间隔(Margin)…...
MIT XV6 - 1.2 Lab: Xv6 and Unix utilities - pingpong
接上文 MIT XV6 - 1.1 Lab: Xv6 and Unix utilities - user/_sleep 是什么?做什么? pingpong 不务正业了那么久(然而并没有,虽然还在探索sleep,但是教材我已经看完了前三章了),让我们赶紧继续下去 在进行本实验之前请务…...
“淘宝闪购”提前4天全量,意味着什么?
4月30日推出,首日上线50个城市,既定5月6日推广至全国的“淘宝闪购”,突然在5月2日早上官宣,提前4天面向全国消费者全量开放。 这一系列节奏,剑指一个字“快”! 是业务发展远超预期的“快”。 4月30日&am…...
Servlet 解决了什么问题?
Servlet 主要解决了以下几个核心问题: 性能问题 (Performance): CGI 的问题: 传统的 CGI 技术为每个Web 请求都启动一个新的进程。进程的创建和销毁涉及大量的系统资源开销(内存分配、CPU 时间、进程上下文切换等)。在高并发场景下…...
Cherry Studio的MCP协议集成与应用实践:从本地工具到云端服务的智能交互
Cherry Studio的MCP协议集成与应用实践:从本地工具到云端服务的智能交互 一、MCP协议与Cherry Studio的技术融合 MCP(Model Context Protocol) 是由Anthropic提出的标准化协议,旨在为AI模型提供与外部工具交互的通用接口。通过M…...
CPU:AMD的线程撕裂者(Threadripper)系列
AMD的线程撕裂者(Threadripper)系列是AMD面向高性能计算(HPC)、工作站(Workstation)和高端桌面(HEDT)市场推出的顶级处理器产品线。该系列以极高的核心数、强大的多线程性能、丰富的…...
(即插即用模块-Attention部分) 六十二、(2022) LKA 大核注意力
文章目录 1、Larger Kernel Attention2、代码实现 paper:Visual Attention Network Code:https://github.com/Visual-Attention-Network 1、Larger Kernel Attention 自注意力机制在 NLP 领域取得了巨大成功,但其应用于计算机视觉任务时存在…...
Spring 分批处理 + 冷热数据分离:历史订单高效迁移与数据清理实战
在实际业务中,随着时间推移,订单量持续增长,若未及时进行数据治理,会造成数据库膨胀、查询缓慢、性能下降等问题。为了实现数据分层管理和系统高性能运行,我们在项目中采用了“冷热数据分离 分批迁移 数据清理”的综…...
Mybatis中的一级二级缓存扫盲
思维导图: MyBatis 提供了一级缓存和二级缓存机制,用于提高数据库查询的性能,减少对数据库的访问次数。(本质上是减少IO次数)。 一级缓存 1. 概念 一级缓存也称为会话缓存,它是基于 SqlSession 的缓存。在同…...
Elasticsearch 常用的 API 接口
文档类 API Index API :创建并建立索引,向指定索引添加文档。例如:PUT /twitter/tweet/1 ,添加一个文档。 Get API :获取文档,通过索引、类型和 ID 获取文档。如GET /twitter/tweet/1。 DELETE API &…...
纯前端专业PDF在线浏览器查看器工具
纯前端专业PDF在线浏览器查看器工具 工具简介 我们最新开发的PDF在线浏览器工具现已发布!这是一个基于Web的轻量级PDF阅读器,无需安装任何软件,直接在浏览器中即可查看和操作PDF文档。 主要功能 ✅ PDF文件浏览 支持本地PDF文件上传流畅的…...
传奇各职业/战士/法师/道士手套/手镯/护腕/神秘腰带爆率及出处产出地/圣战/法神/天尊/祈祷/虹魔/魔血
护腕排行(战士): 名字攻击攻击(均)魔法魔法(均)道术道术(均)防御防御(均)魔御魔御(均)重量要求图标外观产出圣战手镯2-32.50-000-000-10.50-002攻击: 400.02%双头金刚(50级/5000血/不死系)|赤月魔穴(1725,2125)60分钟2只 0.02%双头血魔(55级/5000血/不死系)|赤月魔穴(1725,212…...
觅知解析计费系统重构版在线支付卡密充值多解析接口免授权无后门源码扶风二开
一、源码描述 这是一套视频解析计费源码(扶风二开),可配置多接口和专用特征解析接口,对接在线支付和卡密支付,支持在线充值和卡密充值,支持点数收费模式和包月套餐收费模式,可配置多个视频解析…...
C++11新特性_委托构造函数
格式定义 在 C11 里,委托构造函数的格式为:一个构造函数能够在其成员初始化列表里调用同一个类的其他构造函数。基本语法如下: class ClassName { public:// 被委托的构造函数(目标构造函数)ClassName(参数列表1) : …...
网工_IP协议
2025.02.17:小猿网&网工老姜学习笔记 第19节 IP协议 9.1 IP数据包的格式(首部数据部分)9.1.1 IP协议的首部格式(固定部分可变部分) 9.2 IP数据包分片(找题练)9.3 TTL生存时间的应用9.4 常见…...
C++负载均衡远程调用学习之QPS性能测试
目录 1.昨日回顾 2.QPS_TEST_PROTOBUF协议的集成 3.QPS_TEST_SERVER端实现 4.QPS_TEST_QPS简单介绍 5.QPS_TEST_QPS客户端工具编写和性能测试 1.昨日回顾 2.QPS_TEST_PROTOBUF协议的集成 ## 14) Reactor框架QPS性能测试 接下来我们写一个测试用例来测一下我们…...
C++负载均衡远程调用学习之消息队列与线程池
目录 1.昨日回顾 2.单线程的多路IO服务器模型和多线程模型区别 3.服务器的集中并发模式 4.LARSV0.8-task_msg消息队列任务数据类型 5.LARSV0.8--thread_queue消息队列的发送和接收流 6.LARSV0.8-thread_pool线程池的实现 7.LARSV0.8-thread_pool线程池的实现 8.LARSV0.8…...
Kotlin 基础
Kotlin基础语法详解 Kotlin是一种现代静态类型编程语言,由JetBrains开发,与Java完全互操作。以下是Kotlin的基础语法详解: 1. 基本语法 1.1 变量声明 // 不可变变量(推荐) val name: String = "Kotlin" val age = 25 // 类型推断// 可变变量 var count: In…...
实验数据的转换
最近做实验需要把x轴y轴z轴的数据处理一下,总结一下解决的方法: 源文件为两个txt文档,分别为x轴和y轴,如下: 最终需要达到的效果是如下: 就是需要把各个矩阵的数据整理好放在同一个txt文档里。 步骤① …...
多种尝试解决Pycharm无法粘贴外部文本【本人问题已解决】
#作者:允砸儿 #日期:乙巳青蛇年 四月初五 笔者在写demo的时候遇到一个非常棘手的问题就是pycharm无法复制粘贴,笔者相信有很多的朋友遇到过这种问题,笔者结合搜素到的和自己揣摩出来的方法帮助朋友们解决这种问题。 1、第一种…...
【C++】红黑树迭代版
目录 前言: 一:什么是红黑树? 二:插入什么颜色节点? 三:定义树 四:左单旋和右单旋 1.右单旋 2.左单旋 五:调整树 1.当parent节点为黑色时 2.当parent节点为红色时 2.1 u…...
OSPF路由协议配置
初始环境与准备: 物理连接:按照文件的拓扑连接了 3 台路由器 (R01, R02, R03)、2 台交换机 (Switch0, Switch1) 和 2 台 PC (PC0, PC1)。关键发现:路由器之间的连接实际使用的是以太网线(连接到 FastEthernet 接口),而不是串口线。…...
linux下抓包工具--tcpdump介绍
文章目录 1. 前言2. 命令介绍3. 常见选项3.1. 接口与基本控制3.2 输出控制3.3 文件操作3.4 高级调试 4. 过滤表达式4.1 协议类型4.2 方向与地址4.3 逻辑运算符 5. 典型使用场景5.1 网络故障排查5.2 安全分析与入侵检测5.3 性能分析与优化 linux下抓包工具--tcpdump介绍 1. 前言…...
探索 Disruptor:高性能并发框架的奥秘
在当今的软件开发领域,处理高并发场景是一项极具挑战性的任务。传统的并发解决方案,如基于锁的队列,往往在高负载下表现出性能瓶颈。而 Disruptor 作为一个高性能的并发框架,凭借其独特的设计和先进的技术,在处理海量数…...
smss源代码分析之smss!SmpLoadSubSystemsForMuSession函数分析加载csrss.exe
第一部分: Next SmpSubSystemsToLoad.Flink; while ( Next ! &SmpSubSystemsToLoad ) { p CONTAINING_RECORD( Next, SMP_REGISTRY_VALUE, Entry )…...
《AI大模型应知应会100篇》第44篇:大模型API调用最佳实践(附完整代码模板)
第44篇:大模型API调用最佳实践(附完整代码模板) 摘要 当你的应用突然面临每秒1000请求时,如何保证大模型API调用既稳定又经济?本文通过12个实战代码片段、3套生产级架构方案和20优化技巧,带你构建高性能的…...
第5篇:EggJS中间件开发与实战应用
在Web开发中,中间件(Middleware)是处理HTTP请求和响应的核心机制之一。EggJS基于Koa的洋葱模型实现了高效的中间件机制,本文将深入探讨中间件的执行原理、开发实践以及常见问题解决方案。 一、中间件执行机制与洋葱模型 1. 洋葱模…...
数字智慧方案6187丨智慧应急指挥平台体系建设方案(78页PPT)(文末有下载方式)
数字智慧方案6187丨智慧应急指挥平台体系建设方案 详细资料请看本解读文章的最后内容。 引言 随着社会经济的快速发展,应急管理面临着越来越复杂的挑战。智慧应急指挥平台体系的建设,旨在通过先进的信息技术和智能化手段,提升应急管理的效…...
Linux 常用命令 - tar【归档与压缩】
简介 tar 这个名称来源于 “tape archive”,最初设计用于将文件归档到磁带上。现在,tar 命令已经成为 Linux 系统中最常用的归档工具,它可以将多个文件和目录打包成一个单独的归档文件,并且可以选择使用不同的压缩算法进行压缩&a…...
python常用科学计算库及使用示例
一、NumPy - 数值计算基础库 安装 pip install numpy 核心功能示例 1. 数组创建与运算 import numpy as np# 创建数组 arr np.array([1, 2, 3, 4]) matrix np.array([[1, 2], [3, 4]])# 数学运算 print(arr 1) # [2 3 4 5] print(matrix …...
【中间件】brpc_基础_bthread头文件
bthread.h学习笔记 源码 1 概述 bthread.h 定义了一个用户级线程库,提供类似 POSIX 线程(pthread)的功能,但针对高并发和调度优化进行了扩展。支持线程管理、同步原语、中断机制、线程特定数据等功能,适用于需要高效…...
【AI面试准备】Git与CI/CD及单元测试实战指南
介绍Git、CI/CD 流程、单元测试框架(如 NUnit、JUnit)。如何快速掌握,以及在实际工作中如何运用 目录 一、Git:分布式版本控制系统核心概念高频命令实战建议 二、CI/CD:自动化交付流水线核心流程工具链组合关键配置示…...
个人健康中枢的多元化AI软件革新与精准健康路径探析
引言 人工智能技术的迅猛发展正在重塑医疗健康领域的服务模式和用户体验。随着多模态大模型、MCP协议、A2A协议和思考链算法等创新技术的出现,个人健康中枢正在经历一场深刻的软件革新。这些技术不仅打破了传统健康管理系统的信息孤岛,还通过多维度数据整合和深度推理能力,…...
Java文件上传
war包利用 WAR包结构详解-CSDN博客 Tomcat弱口令及war包漏洞复现(保姆级教程)-CSDN博客 Tomcat 8.x弱口令获取manager权限上传任意war包漏洞复现 - Stunmaker - 博客园...
Python项目源码63:病历管理系统1.0(tkinter+sqlite3+matplotlib)
1.病历管理系统包含以下主要功能: 核心功能:病历信息录入(患者姓名、年龄、性别、诊断结果、主治医生),自动记录就诊时间,病历信息展示(使用Treeview表格),病历信息查询…...
Unity 与 Lua 交互详解
Unity 与 Lua 的交互是热更新实现的核心技术,下面我将从底层原理到实际应用全面解析交互机制。 一、交互基础原理 1. 通信架构 Unity (C#) 原生层↑↓ 通过P/Invoke调用 Lua虚拟机层 (C/C实现)↑↓ Lua脚本解释执行 业务逻辑层 (Lua脚本) 2. 数据类型映射表 Lu…...
【Vue】Vue与UI框架(Element Plus、Ant Design Vue、Vant)
个人主页:Guiat 归属专栏:Vue 文章目录 1. Vue UI 框架概述1.1 主流Vue UI框架简介1.2 选择UI框架的考虑因素 2. Element Plus详解2.1 Element Plus基础使用2.1.1 安装与引入2.1.2 基础组件示例 2.2 Element Plus主题定制2.3 Element Plus的优缺点分析 3…...
期刊、出版社、索引数据库
image 1、研究人员向期刊或者会议投稿,交注册费和相应的审稿费等相关费用[1]; 2、会议组织者和期刊联系出版社,交出版费用; 3、出版社将论文更新到自己的数据库中,然后将数据库卖给全世界各大高校或企业; 4…...