当前位置: 首页 > news >正文

linux asio网络编程理论及实现

        最近在B站看了恋恋风辰大佬的asio网络编程,质量非常高。在本章中将对ASIO异步网络编程的整体及一些实现细节进行完整的梳理,用于复习与分享。大佬的博客:恋恋风辰官方博客

Preactor/Reactor模式

        在网络编程中,通常根据事件处理的触发时机处理逻辑的分工可以分为reactor模式和preactor模式。reator是非阻塞同步网络模式,preactor是异步网络模式。

        阻塞I/O我们知道read分为两个步骤一个是检测IO和操作IO.第一个是检测recv buffer也就是通过fd找到对应的接收缓冲区里面是否有数据,也就是io是否就绪,如果有数据拷贝到设置的buffer,并返回数据长度。如果没有就绪他就是阻塞的。但是如果设置类非阻塞I/O也就O_NONBLOCK,他就会立即返回,此时应用程序不断轮询内核,直到数据准备好,内核将数据拷贝到应用程序缓冲区,read 调用才可以获取到结果。

        但是真正的异步IO,是询问数据是否准备好和把数据从接受缓冲区拷贝到指定buffer,都是不用等待的。我们发起read之后,就不用管了,内核会自动把他写到指定缓冲区中。应用程序并不需要主动发起拷贝动作。就像我去吃饭,我点完菜我就不用管了,剩下的就交给饭店了(内核),做好了告诉我。

        Reactor 可以理解为「来了事件操作系统通知应用进程,让应用进程来处理」,而 Proactor 可以理解为「来了事件操作系统来处理,处理完再通知应用进程」

  

IO多路复用

        我们在实际观看代码的时候还需要了解多路IO复用的知识。前面有提到,Read是分为两步的,一个用来检测是否有数据,一个用来操作数据。IO多路复用不能去操作IO但是可以检测IO并且可以同时检测多路IO,他们都是同步IO。常见的多路IO选择有epoll,select以及poll。但是epoll基本可以完成select以及poll的功能。

        select需要三个集合即:读事件,写事件,异常事件。且select会线性扫描所有文件描述符,即使只有少数文件描述符状态发生了变化,select也要遍历整个文件描述符集。而且select受fd_size影响通常是1024。poll只需要一个集合,异常用event标识就好了,返回的时候用revent,也就是事件就绪的话我们会去修改revent。

        我们再来看看epoll,相比于轮询机制,epoll使用事件通知机制:将感兴趣的文件描述符及其事件类型添加到 epoll 实例中,由内核自动监视这些描述符的状态,并在状态变化时通知用户进程。Epoll在通过epoll_creat之后会返回一个描述符。这个描述符对着用的epoll的实体,其内部是由红黑树创建成的。为什么用红黑树而不是平衡搜索树,是因为平衡搜索树平衡要求比较严苛,每次插入删除都需要保持平衡从而左右选转树。性能要求较高,但是红黑树是按照黑节点来算平衡的。所以要求没那么高的同时,也能保证一些性能。epoll可以通过epoll_ctl将fd挂上树进行监听,他有四个参数,第一个就是本体对应的fd,也就是之前epoll_creat创建的fd,第二个是要干嘛,第三个个是目标fd,第四个是epoll_event结构体,这个epoll_event包含信息,epollin/out/rr 挂上去放下来,其中还有一个data联合体,里面还有一个对应监听事件的fd,以及void* ptr这个指针可以让我们携带更多信息用来判断。后面我再分享一个reactor模式的就是用了void* ptr。epoll_wait也就是阻塞监听的函数了,他会返回数据准备好的fd个数,然后把对应的epoll_event放到epoll_event数组里,遍历这个数组就可以进行操作IO了。

boost.asio库

        我们知道在linux上其实天生支持的是reactor模式,而windows上支持的是preactor模式。但是asio库可以跨平台实现异步网络编程,而且比自己造轮子方便多了。下面将介绍一些大概的api。

 io_context

    这个asioPreactor的核心部分,它代表了一个事件循环的上下文,所有的异步操作都需要关联到一个io_context对象。io_context 负责分发事件并驱动异步操作的完成。它内部其实封装的是一个epoll。在内部,他会调用epoll_creat先创建一个实例。然后async_read/write会注册fd到这个epoll实例上面去。io_context.run()其实调用的就是epoll_wait.一旦有事件触发,他就触发回调处理了。不知道大家注意到没有,其实在linux上,他这个还是reactor的,不是真正的操作系统级别支持的,而是在用户空间模拟出来的异步,由于网络编程中的 socket 是不支持的,这也使得基于 Linux 的高性能网络程序都是使用 Reactor 方案。比较方便的就是不用自己造造轮子设计封装了,不然又要设计回调又要设计channel这些层很麻烦。但是你同样的代码你写正在Windows就是真Preactor,因为高贵的Windows有IOCP接口。他有四个常用的API:

  • io_context.run():启动事件循环,直到没有待处理的异步操作。
  • io_context.stop():停止事件循环(标志位)。
  • io_context.poll():检查并处理所有已经完成的异步操作,但不会阻塞,适用于非阻塞事件调度。
  • io_context.restart():重启 io_context,使其可以重新执行 run()。

async_read/write

        这两个也就是异步读写了,他们一般第一个传,socket,第二个数据缓冲区buffer。第三个传回调函数。

strand

        在并发过程中,一般有几种方案,我常用的两种是多个线程跑多个io_context。以及一个io_context被多个线程使用。我一般用第一种多一点,第二种也就是一个io_context的话,存在一个问题就是io_context被多个线程调用从而异步回调函数在多个线程被并发调用,共享资源竞争激烈。竞争吗,easy,那我用锁+条件变量就完事了对吧。但是strand更简单粗暴,他是无锁的。像我们设计无锁用条件变量,还要考虑先行关系吧,还有release,aquire吧,还要考虑数据结构吧。他是io_context的一个 执行器(executor),它通过任务队列的方式串行化任务执行,避免了多个线程同时访问共享资源。你仅仅只需要把传回调函数的场景,换成用它封装一下就好了。就像这样:

void write(std::size_t length) {auto self = shared_from_this();boost::asio::async_write(socket_,boost::asio::buffer(data_, length),boost::asio::bind_executor(strand_,  // 绑定到 strand[this, self](boost::system::error_code ec, std::size_t) {if (!ec) {std::cout << "Write complete" << std::endl;read();  // 继续读取}}));}

Endpoint

        这个就类似于socket里面的Socketaddr_in,用于绑定IP地址和端口。一般测试我就用boost::asio::ip::tcp::v4(),他是自动绑定到所有 IPv4 地址,一般是0.0.0.0。当然也可以手动设置boost::asio::ip::make_address("127.0.0.1").

acceptor

        他是监听并接受客户端连接 的类。它是 TCP 服务器的核心组件。负责:绑定到指定的 IP 地址和端口(由endpoint提供)。监听来自客户端的连接请求。接受连接并生成一个新的 socket。他接受两个参数。boost::asio::ip::tcp::acceptor acceptor(io_context, endpoint);他就相当于承担你监听lfd的功能了,在io_context执行run之后他也开始会监听了。其中的async_accept就相当于EPOLL_CTL_ADD,他的作用是等待一个新的客户端连接,一旦有新连接到达,将连接信息(如文件描述符、客户端 IP 和端口)写入socket,然后挂到io_context内部的epoll红黑树上去。close或销毁对应 EPOLL_CTL_DEL在创建的时候就相当于执行了以下代码:

    int server_fd = socket(AF_INET, SOCK_STREAM, 0);sockaddr_in server_addr{};server_addr.sin_family = AF_INET;server_addr.sin_port = htons(8080);server_addr.sin_addr.s_addr = INADDR_ANY;bind(server_fd, (sockaddr*)&server_addr, sizeof(server_addr));listen(server_fd, 10);

signal_set

        他是一个异步信号处理器,用于捕获操作系统信号,用来实现优雅退出的。在后面的代码中我们会看到他的用法。

Socket

      Socket是数据通信的核心,它表示一个具体的网络连接。每个 Socket对应一个客户端连接,负责与客户端之间的数据收发。当他被io_context创建时,他也和io_context绑定了。它包含客户端的 IP 地址、端口号,以及底层网络通信所需的所有信息。当有新的客户端连接时,acceptor会生成一个新的Socket,用于与客户端通信。所以他需要使用和acceptor同样的io_context上下文信息。Socket 包含哪些信息?

  1. 底层的文件描述符(fd

    • Socket的核心是一个文件描述符,表示操作系统分配给这个网络连接的资源。
  2. 连接的状态信息

    • 客户端的 IP 地址。
    • 客户端的端口号。
    • 本地绑定的 IP 地址和端口号。
  3. 用于通信的缓冲区

    • 内部有读写缓冲区,用于存储从网络接收到的数据或待发送的数据。
  4. 通信参数

    • 如超时设置、是否启用 Nagle 算法等。
  5. 与 Boost.Asio 的集成

    • 支持异步操作(如 async_readasync_write)。
    • 可以通过事件循环(io_context)管理其 I/O 操作。

代码详解

        

设计结构

        恋恋风辰大佬的设计结构如下图所示:

  多线程IOServicePoll

IOServicePool多线程模式特点

1   每一个io_context跑在不同的线程里,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。

2   但是对于不同的socket,回调函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket2代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者。

3   多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是穿行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。

        以上就是整个架构的大概的流程,其中将session与logic层分离,session层负责处理读写 IO、解析网络协议,并将收到的数据通过任务队列(线程安全)交给逻辑线程处理从任务队列中取出解析后的数据,执行业务逻辑处理(如数据库操作、计算、文件操作等)。将需要响应的结果再通过任务队列交给会话线程,由会话线程将数据发送给客户端。后面我们梳理完整体代码之后会再次作出总结。首先我们把需要使用的一些类介绍一下。

 Singleton(单例)

        

#pragma once
#include <memory>
#include <mutex>
#include <iostream>
using namespace std;
template <typename T>
class Singleton {
protected:Singleton() = default;Singleton(const Singleton<T>&) = delete;Singleton& operator=(const Singleton<T>& st) = delete;static std::shared_ptr<T> _instance;
public:static std::shared_ptr<T> GetInstance() {static std::once_flag s_flag;std::call_once(s_flag, [&]() {_instance = shared_ptr<T>(new T);});return _instance;}void PrintAddress() {std::cout << _instance.get() << endl;}~Singleton() {std::cout << "this is singleton destruct" << std::endl;}
};template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;

        这个其实在之前的博客中有介绍,需要实现单例的类直接继承这个就好了。有兴趣的读者可以移步以下链接:单例及线程池的实现及感悟分享-CSDN博客

MsgNode

#pragma once
#include <string>
#include "const.h"
#include <iostream>
#include <boost/asio.hpp>
using namespace std;
using boost::asio::ip::tcp;
class LogicSystem;
class MsgNode
{
public:MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {_data = new char[_total_len + 1]();_data[_total_len] = '\0';}~MsgNode() {std::cout << "destruct MsgNode" << endl;delete[] _data;}void Clear() {::memset(_data, 0, _total_len);_cur_len = 0;}short _cur_len;short _total_len;char* _data;
};class RecvNode :public MsgNode {friend class LogicSystem;
public:RecvNode(short max_len, short msg_id);
private:short _msg_id;
};class SendNode:public MsgNode {friend class LogicSystem;
public:SendNode(const char* msg,short max_len, short msg_id);
private:short _msg_id;
};RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),
_msg_id(msg_id){}SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
, _msg_id(msg_id){//先发送id, 转为网络字节序short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);memcpy(_data, &msg_id_host, HEAD_ID_LEN);//转为网络字节序short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}

        MSG也就是消息的载体,为了方便切包粘包必须有cur_len和max_len.后面的RecvNode和SendNode其实是为了逻辑层设计的,如果不用逻辑层,session处理数据逻辑,写个demo的话,这个可以不写。RecvNode 主要用于存储接收到的数据,Session处理之后把自己和数据丢给逻辑层,SendNode就是处理过的数据,丢给session让他去发。SendNode记得要转换大端序和小端序,网络中是大端序,计算机本地一般用小端序。大端序也就像我们写数字一样,大的在前面,小端序则相反,最低位在前。

Const

#pragma once
#define MAX_LENGTH  1024*2
//头部总长度
#define HEAD_TOTAL_LEN 4
//头部id长度
#define HEAD_ID_LEN 2
//头部数据长度
#define HEAD_DATA_LEN 2
#define MAX_RECVQUE  10000
#define MAX_SENDQUE 1000enum MSG_IDS {MSG_HELLO_WORD = 1001
};

                这里没什么好说的,就是定义了一些长度相关的信息。我们发现上面其实用的是宏定义,下面是enum。我看effective C++是这么解释的,条款2尽量以const,enum,inline替代#define。这是因为在编译的时候会直接替代成对应的定义,他不会被编译器看见。一旦出现报错,那么错误可能指向定义比如1.6。由于使用了宏,代码中可能出现多个1.6的副本无法定位问题所在。枚举可以完美替代宏。但是这里上面依然使用了宏定义,我认为应该是宏适用于简单的数值常量(如缓冲区大小、头部长度等),这类值通常只表示尺寸或范围,不需要逻辑关联或类型检查。枚举适用于有逻辑关联的一组常量(如消息 ID),这类值需要与程序逻辑配合,并具备扩展性和类型安全性。

AsioIOServicePool

#pragma once
#include <vector>
#include <boost/asio.hpp>
#include "Singleton.h"
class AsioIOServicePool:public Singleton<AsioIOServicePool>
{friend Singleton<AsioIOServicePool>;
public:using IOService = boost::asio::io_context;using Work = boost::asio::io_context::work;using WorkPtr = std::unique_ptr<Work>;~AsioIOServicePool();AsioIOServicePool(const AsioIOServicePool&) = delete;AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;// 使用 round-robin 的方式返回一个 io_serviceboost::asio::io_context& GetIOService();void Stop();
private:AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());std::vector<IOService> _ioServices;std::vector<WorkPtr> _works;std::vector<std::thread> _threads;std::size_t _nextIOService;
};#include <iostream>
using namespace std;
AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
_works(size), _nextIOService(0){for (std::size_t i = 0; i < size; ++i) {_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));}//遍历多个ioservice,创建多个线程,每个线程内部启动ioservicefor (std::size_t i = 0; i < _ioServices.size(); ++i) {_threads.emplace_back([this, i]() {_ioServices[i].run();});}
}AsioIOServicePool::~AsioIOServicePool() {std::cout << "AsioIOServicePool destruct" << endl;
}boost::asio::io_context& AsioIOServicePool::GetIOService() {auto& service = _ioServices[_nextIOService++];if (_nextIOService == _ioServices.size()) {_nextIOService = 0;}return service;
}void AsioIOServicePool::Stop(){//因为仅仅执行work.reset并不能让iocontext从run的状态中退出//当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。for (auto& work : _works) {//把服务先停止work->get_io_context().stop();work.reset();}for (auto& t : _threads) {t.join();}
}

        在这个并发场景下,我们是多个线程中跑多个io_context,所以需要一个io池来创建和管理io_context。之前的场景中我们是在Accept回调里面结束再挂一个Accept上去,如果触发又触发回调又挂上去,所以io_context.run()会阻塞,但是实际上如果不这么设计的话,io_context就会停止工作,导致所有正在进行的异步操作都被取消。这时,我们需要使用work对象来防止io_context停止工作。

        work作用是持有一个指向io_context的引用,并通过创建一个“工作”项来保证io_context不会停止工作,直到work对象被销毁或者调用reset()方法为止。当所有异步操作完成后,程序可以使用work.reset()方法来释放io_context,从而让其正常退出。需要说明的是这unique_ptr的操作,所以需要用unique_ptr去包装.一般就是析构的时候先用get_io_context().stop()先让他停了再reset()。

       _nextIOService是一个轮询索引,用最简单的轮询算法为每个新创建的连接分配io_context.

        初始化,也就是io_context的创建过程。他先按根据CPU核心数来分配线程数,在单例那节我们说过,这样做能减少线程竞争导致的时间片损耗,进程也可以按这个来分。然后线程存进去的推荐使用emplace_back,一个是减少拷贝的开销,如果是push_back如果设计不当容器造成两个线程,一个线程被放进去了,一个就没人管理了,也回收不了。

        GetIOService()就是去取io_context,如果取满了就从0开始。

Main

        

#include <iostream>
#include "CServer.h"
#include "Singleton.h"
#include "LogicSystem.h"
#include <csignal>
#include <thread>
#include <mutex>
#include "AsioIOServicePool.h"
using namespace std;
bool bstop = false;
std::condition_variable cond_quit;
std::mutex mutex_quit;int main()
{try {auto pool = AsioIOServicePool::GetInstance();boost::asio::io_context  io_context;boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&io_context,pool](auto, auto) {io_context.stop();pool->Stop();});CServer s(io_context, 10086);io_context.run();}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}}

        这里实现的是优雅退出。那两个wait是因为async_wait要求必须写两个参数,一个是EC,异步,一个是signal_num,捕获的是什么什么信号。为什么pool那里不用&而是用值捕获走拷贝?因为pool是一个shared_ptr,可以传递,内部引用计数会加1,shared_ptr只会在被普通指针创建的时候,引用计数不会增加,且普通指针的生命周期管理不受shared_ptr控制,这也是我们说的不用智能指针和普通指针混用,容易出问题。这里将io_context传递给了Server,用于监听新连接fd.

CServer

#pragma once
#include <boost/asio.hpp>
#include "CSession.h"
#include <memory.h>
#include <map>
#include <mutex>
using namespace std;
using boost::asio::ip::tcp;
class CServer
{
public:CServer(boost::asio::io_context& io_context, short port);~CServer();void ClearSession(std::string);
private:void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);void StartAccept();boost::asio::io_context &_io_context;short _port;tcp::acceptor _acceptor;std::map<std::string, shared_ptr<CSession>> _sessions;std::mutex _mutex;
};#include "CServer.h"
#include <iostream>
#include "AsioIOServicePool.h"
CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),
_acceptor(io_context, tcp::endpoint(tcp::v4(),port))
{cout << "Server start success, listen on port : " << _port << endl;StartAccept();
}CServer::~CServer() {cout << "Server destruct listen on port : " << _port << endl;
}void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){if (!error) {new_session->Start();lock_guard<mutex> lock(_mutex);_sessions.insert(make_pair(new_session->GetUuid(), new_session));}else {cout << "session accept failed, error is " << error.what() << endl;}StartAccept();
}void CServer::StartAccept() {auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}void CServer::ClearSession(std::string uuid) {lock_guard<mutex> lock(_mutex);_sessions.erase(uuid);
}

        server类为服务器接收连接的管理类。他先创建了一个Acceptor_,然后调用了StartAccept()。也就是启动了Accept,也就是我上文说的挂了一个Accept上去,结束的时候又挂一个Accept上去。单线程的话是传递同一io_context,但是这个多线程就是从Pool取io_context.放进去,从而并发。这是什么意思呢?以前单线程的时候,就相当于是监听新的连接然后挂到自己身上,现在是挂到池子里的io_context里面的红黑树上。

        还有一个问题,他在创建Session的时候,把this指针传过去了,以及在创建session的时候,是通过bind绑定的。说后面这个问题,这是因为async_accept只接受一个ec参数,那不接受另外的参数我们就只能bind了。把指针传递过去,是因为server还管理着session节点,每创建一个session,就会把他投递进去,错误/析构的时候会clear.所以会传进去。

伪闭包

        在看session代码之前,我们要知道官方给的案例有什么隐患。当服务器即将发送数据前(调用async_write前),此刻客户端中断,服务器此时调用async_write会触发发送回调函数,判断ec为非0进而执行delete this逻辑回收session。但要注意的是客户端关闭后,在tcp层面会触发读就绪事件(四次挥手,客户端发的FIN包),服务器会触发读事件回调函数。在读事件回调函数中判断错误码ec为非0,进而再次执行delete操作,从而造成二次析构,这是极度危险的。

        我们知道GO有一个闭包,闭包(Closure) 是指 一个函数可以捕获并引用其外部作用域中的变量,即使该变量的生命周期已经超过了其原本的作用域核心机制是延长外部变量的生命周期,使其与闭包的生命周期一致。也就是在异步操作中,通过智能指针增加引用计数,使得对象不会在异步回调执行前被销毁,从而避免悬挂指针和二次析构问题。本来我们是通过map管理的session节点,但是他被移除了之后不会立马释放,而是在处理完回调之后再回收内存。但是C++本身并不支持闭包操作,所以我们就要通过设计来实现一个伪闭包的效果。也就是只要我函数还在用这个变量他就不会被释放,智能指针的shared_ptr非常适合实现伪闭包。

        也就是说,无论是回调读还是写,我们都把自身节点传递给对方,我也不需要这个指针做什么,只要我还有一个函数没有处理完,那么shared_ptr的引用计数最多就是加减,全部处理完才会结束析构。但是传递自身需要传递自身需要继承std::enable_shared_from_this<CSession>,别傻傻的去传this指针,this指针又不是智能指针,不会增加引用计数的,而且也别混用普通指针和智能指针。最方便的还是直接继承然后用shared_from_this()就好了。

Session/read

          

#pragma once
#include <boost/asio.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <queue>
#include <mutex>
#include <memory>
#include "const.h"
#include "MsgNode.h"
using namespace std;using boost::asio::ip::tcp;
class CServer;
class LogicSystem;class CSession: public std::enable_shared_from_this<CSession>
{
public:CSession(boost::asio::io_context& io_context, CServer* server);~CSession();tcp::socket& GetSocket();std::string& GetUuid();void Start();void Send(char* msg,  short max_length, short msgid);void Send(std::string msg, short msgid);void Close();std::shared_ptr<CSession> SharedSelf();
private:void HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self);void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);tcp::socket _socket;std::string _uuid;char _data[MAX_LENGTH];CServer* _server;bool _b_close;std::queue<shared_ptr<SendNode> > _send_que;std::mutex _send_lock;//收到的消息结构std::shared_ptr<RecvNode> _recv_msg_node;bool _b_head_parse;//收到的头部结构std::shared_ptr<MsgNode> _recv_head_node;
};class LogicNode {friend class LogicSystem;
public:LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>);
private:shared_ptr<CSession> _session;shared_ptr<RecvNode> _recvnode;
};
#include "CSession.h"
#include "CServer.h"
#include <iostream>
#include <sstream>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>
#include "LogicSystem.h"CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){boost::uuids::uuid  a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
CSession::~CSession() {std::cout << "~CSession destruct" << endl;
}tcp::socket& CSession::GetSocket() {return _socket;
}std::string& CSession::GetUuid() {return _uuid;
}void CSession::Start(){::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}void CSession::Send(std::string msg, short msgid) {std::lock_guard<std::mutex> lock(_send_lock);int send_que_size = _send_que.size();if (send_que_size > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;return;}_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));if (send_que_size > 0) {return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}void CSession::Send(char* msg, short max_length, short msgid) {std::lock_guard<std::mutex> lock(_send_lock);int send_que_size = _send_que.size();if (send_que_size > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;return;}_send_que.push(make_shared<SendNode>(msg, max_length, msgid));if (send_que_size>0) {return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}void CSession::Close() {_socket.close();_b_close = true;
}std::shared_ptr<CSession>CSession::SharedSelf() {return shared_from_this();
}void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {//增加异常处理try {if (!error) {std::lock_guard<std::mutex> lock(_send_lock);//cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl;_send_que.pop();if (!_send_que.empty()) {auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));}}else {std::cout << "handle write failed, error is " << error.what() << endl;Close();_server->ClearSession(_uuid);}}catch (std::exception& e) {std::cerr << "Exception code : " << e.what() << endl;}}void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){try {if (!error) {//已经移动的字符数int copy_len = 0;while (bytes_transferred > 0) {if (!_b_head_parse) {//收到的数据不足头部大小if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);_recv_head_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}//收到的数据比头部多//头部剩余未复制的长度int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);//更新已处理的data长度和剩余未处理的长度copy_len += head_remain;bytes_transferred -= head_remain;//获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;//id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_uuid);return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);//网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;//id非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_uuid);return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里if (bytes_transferred < msg_len) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));//头部处理完成_b_head_parse = true;return;}memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);_recv_msg_node->_cur_len += msg_len;copy_len += msg_len;bytes_transferred -= msg_len;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//cout << "receive data is " << _recv_msg_node->_data << endl;//此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));//继续轮询剩余未处理数据_b_head_parse = false;_recv_head_node->Clear();if (bytes_transferred <= 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}continue;}//已经处理完头部,处理上次未接受完的消息数据//接收的数据仍不足剩余未处理的int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;if (bytes_transferred < remain_msg) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);_recv_msg_node->_cur_len += remain_msg;bytes_transferred -= remain_msg;copy_len += remain_msg;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//cout << "receive data is " << _recv_msg_node->_data << endl;//此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));//继续轮询剩余未处理数据_b_head_parse = false;_recv_head_node->Clear();if (bytes_transferred <= 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}continue;}}else {std::cout << "handle read failed, error is " << error.what() << endl;Close();_server->ClearSession(_uuid);}}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}
}LogicNode::LogicNode(shared_ptr<CSession>  session, shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) {}

         Session也就是会话层的主要功能是处理IO。这个比较长,我们分开梳理。

       构造函数 

        

CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){boost::uuids::uuid  a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}

        我们可以看到他主要就是用io_context生成了socket,同时也把socket和io_context绑定了,详情可以看前面写的socket.然后用雪花算法生成一个uuid,在server层会把他取出来,放进map里面管理。

        最后,把接收头消息node给初始化了。一般情况下,我们为了粘包问题,以及数据完整性,投递任务队列等问题。会把数据分为3部分,第一部分消息id,也就是逻辑层先注册函数,然后放进map,消息id也就对应着处理的函数,第二个是长度,看看有没有收发完整,第三个就是负载数据了。为什么只初始化头部的node?这是因为一般来说消息是分开收的,先收头部,根据头部解析出后面的消息后再收后面的消息。

start

        

void CSession::Start(){::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer
(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1,std::placeholders::_2, SharedSelf()));
}

        在server接到新连接之后会把socket挂到树上然后触发回调,在回调中就调用了Start函数。我们来看一下他干了什么,首先刚触发,肯定需要把缓冲区_data清空一下,然后调用socket的async_read_some,asio接受的缓冲区都是buffer,它包含缓冲区首地址和长度两个信息,后面的Bind就是老方子,伪闭包。这里需要说明一下读写有async_read_some,async_read,async_write,async_write_some四种api.它们的区别主要体现在 行为模式数据量控制 上.

        async_read_some是尽可能多读,它是一个“非阻塞”读取操作,会立即返回并等待数据到达。不会等到缓冲区完全填满,只要有数据到达,就会尽量多地读取并触发回调,适合需要尽快处理部分数据的场景,例如流式传输。我们也能看到这种很像水平触发不是边缘触发。

        async_read,功能是,尝试完全读取指定长度的数据,只有在数据完全到达时才会调用回调函数。如果指定的数据尚未全部到达,操作会挂起,直到完成或发生错误。

特点:更高级别,提供“完整性保证”,确保读取到指定长度的数据。

        写也是如此,尽量多写和全部发送。一般来说多线程读用async_read_some,因为可以切包,保证数据的完整。并且由于是非阻塞,他的性能更高。发送书一般用async_write发送完整数据。

HandleRead

        由于这个函数太大,他主要是切包,直接放不好看,分几部分梳理。

头部处理1
if (!error) {int copy_len = 0;while (bytes_transferred > 0) {if (!_b_head_parse) {//收到的数据不足头部大小if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);_recv_head_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}//收到的数据比头部多//头部剩余未复制的长度int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);//更新已处理的data长度和剩余未处理的长度copy_len += head_remain;bytes_transferred -= head_remain;//获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;

        这框有点小,async_read_some一般会传递给回调函数ec以及数据长度bytetransfer,那么我们就需要自定义一个int copy_len 代表已经处理了的数据长度。因为是局部变量,每次回调他都会初始化为0.接下来就是切包了,当数据长度大于0,_b_head_parse代表头部是否处理了,初始化的时候设定为false.这里分两种情况,一种数据总大小小于头部大小,这时候就直接收完,然后把数据节点的cur_len设置一下返回就行了,然后再挂一个读。基本都是这么设计,读完继续挂回调读,写完继续挂回调监听写,接受新连接继续完了继续挂async_accept。别忘了读清空_data.

        如果比头部大,就先接完头部的消息,然后获取msg_id,别忘了把msg_id从大端序转成小端序。

头部处理2
//id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_uuid);return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);//网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;//id非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_uuid);return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里if (bytes_transferred < msg_len) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));//头部处理完成_b_head_parse = true;return;}

        后面是长度去出来,如果ID或者长度非法则把本节点从_server里面移除,不要这个连接了。我们来看一下移除了之后会发生什么,首先先关闭连接然后调用了server_把他从map里面移除了,这时候不会立马析构回收,由于我们有伪闭包,他会把回调处理完之后return出了作用域之后再触发析构。接下来有两种选择。

        1.接受消息长度小于头部消息里的小心长度,那么就把他放到_recv_msg_node,设置_b_head_parse为true返回即可挂一个读上去,下次就不用走头部处理了。

close函数如下:

void CSession::Close() {_socket.close();_b_close = true;
}
主要数据处理
				int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;if (bytes_transferred < remain_msg) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);_recv_msg_node->_cur_len += remain_msg;bytes_transferred -= remain_msg;copy_len += remain_msg;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//cout << "receive data is " << _recv_msg_node->_data << endl;//此处将消息投递到逻辑队列中LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));//继续轮询剩余未处理数据_b_head_parse = false;_recv_head_node->Clear();if (bytes_transferred <= 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}continue;}

他这个也有两种情况,一种是还是不够,那么老操作之后返回就完事了。还有一种就是够了,我们封装完最后一个字节为'\0'.然后通过逻辑系统把他投递到逻辑队列里就行了。后面我们看到逻辑层的时候我们再细说。往后就是设置    _b_head_parse = false;_recv_head_node->Clear();如果收完了就老操作,没收完就continue下一次轮回。

        后面的函数我们也不讲了,先将逻辑层,这样连贯一点。

LogicSystem/Node

#pragma once
#include "Singleton.h"
#include <queue>
#include <thread>
#include "CSession.h"
#include <queue>
#include <map>
#include <functional>
#include "const.h"
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>typedef  function<void(shared_ptr<CSession>, const short &msg_id, const string &msg_data)> FunCallBack;
class LogicSystem:public Singleton<LogicSystem>
{friend class Singleton<LogicSystem>;
public:~LogicSystem();void PostMsgToQue(shared_ptr < LogicNode> msg);
private:LogicSystem();void DealMsg();void RegisterCallBacks();void HelloWordCallBack(shared_ptr<CSession>, const short &msg_id, const string &msg_data);std::thread _worker_thread;std::queue<shared_ptr<LogicNode>> _msg_que;std::mutex _mutex;std::condition_variable _consume;bool _b_stop;std::map<short, FunCallBack> _fun_callbacks;
};
#include "LogicSystem.h"using namespace std;LogicSystem::LogicSystem():_b_stop(false){RegisterCallBacks();_worker_thread = std::thread (&LogicSystem::DealMsg, this);
}LogicSystem::~LogicSystem(){_b_stop = true;_consume.notify_one();_worker_thread.join();
}void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {std::unique_lock<std::mutex> unique_lk(_mutex);_msg_que.push(msg);//由0变为1则发送通知信号if (_msg_que.size() == 1) {unique_lk.unlock();_consume.notify_one();}
}void LogicSystem::DealMsg() {for (;;) {std::unique_lock<std::mutex> unique_lk(_mutex);//判断队列为空则用条件变量阻塞等待,并释放锁while (_msg_que.empty() && !_b_stop) {_consume.wait(unique_lk);}//判断是否为关闭状态,把所有逻辑执行完后则退出循环if (_b_stop ) {while (!_msg_que.empty()) {auto msg_node = _msg_que.front();cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}break;}//如果没有停服,且说明队列中有数据auto msg_node = _msg_que.front();cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}
}void LogicSystem::RegisterCallBacks() {_fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,placeholders::_1, placeholders::_2, placeholders::_3);
}void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "<< root["data"].asString() << endl;root["data"] = "server has received msg, msg data is " + root["data"].asString();std::string return_str = root.toStyledString();session->Send(return_str, root["id"].asInt());
}

我们先来看看构造函数

构造函数

LogicSystem::LogicSystem():_b_stop(false){RegisterCallBacks();_worker_thread = std::thread (&LogicSystem::DealMsg, this);
}

他首先会调用注册函数,也就是我们前面说的,根据要处理的消息id来调用对应的函数。这个应该算csp模式吧,所有任务共用一个缓冲区。然后单独开一个线程,去处理数据。

RegisterCallBacks

       

typedef  function<void(shared_ptr<CSession>
, const short &msg_id, const string &msg_data)> FunCallBack;std::map<short, FunCallBack> _fun_callbacks;void LogicSystem::RegisterCallBacks() {_fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,placeholders::_1, placeholders::_2, placeholders::_3);
}

为了方便观看,把前面两个参数也写了一下,之前线程池commit是用完美转发+task_package+future来实现不同函数的多线程并发。但是设置回调函数不用这么麻烦,直接用std::function就行了,同样他也是接收三个参数无返回值,不对那个commit是用lambda封装过的,不用参数。这个自己逻辑系统内回调可以不用那么写。这就相当于注册回调函数了。

HelloWorldCallback

void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "<< root["data"].asString() << endl;root["data"] = "server has received msg, msg data is " + root["data"].asString();std::string return_str = root.toStyledString();session->Send(return_str, root["id"].asInt());
}

这里面就是具体的处理逻辑。我们先介绍一下大概的函数然后连起来梳理一下这个过程。

DealMsg

void LogicSystem::DealMsg() {for (;;) {std::unique_lock<std::mutex> unique_lk(_mutex);//判断队列为空则用条件变量阻塞等待,并释放锁while (_msg_que.empty() && !_b_stop) {_consume.wait(unique_lk);}//判断是否为关闭状态,把所有逻辑执行完后则退出循环if (_b_stop ) {while (!_msg_que.empty()) {auto msg_node = _msg_que.front();cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}break;}//如果没有停服,且说明队列中有数据auto msg_node = _msg_que.front();cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end()) {_msg_que.pop();continue;}call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));_msg_que.pop();}
}

这个就是一直处理任务的函数,他有两种情况,一种是要关了,俺么他就会把所有的任务处理完然后break.没有就一直处理。没有数据就挂起等待。首先他会从消息队列里面拿一个logicnode

我们先看一下Logicnode的结构

class LogicNode {friend class LogicSystem;
public:LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>);
private:shared_ptr<CSession> _session;shared_ptr<RecvNode> _recvnode;
};

它里面包含一个_session一个消息,RecvNode包含数据长度 id 以及_data数据。拿到id之后就通过注册函数调用,再把msg_node(logicnode)弹出就好了。

数据经过逻辑层到IO的过程

        之前我们看到了,数据被投递到了msg队列中。然后由dealmsg处理,处理的时候会使用传递对应session与数据给对应的注册函数,注册函数处理完数据之后。就会调用对应session的send

Session/write

        这里主要讲的是session的发送io

send

void CSession::Send(std::string msg, short msgid) {std::lock_guard<std::mutex> lock(_send_lock);int send_que_size = _send_que.size();if (send_que_size > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;return;}_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));if (send_que_size > 0) {return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

他这个的意思是,最开始获取一个长度,如果长度大于max就说明队列满了,数据不要了直接丢了。要是没满就把他放到发送队列里。他这个if判断的其实是队列里面有刚进来的时候没有数据,没有数据就说明async_write停了或者说第一次进入就需要重新挂一个写,不然就让他自己一直回调慢慢处理数据就好了。

HandleWrite

void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {//增加异常处理try {if (!error) {std::lock_guard<std::mutex> lock(_send_lock);//cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl;_send_que.pop();if (!_send_que.empty()) {auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));}}else {std::cout << "handle write failed, error is " << error.what() << endl;Close();_server->ClearSession(_uuid);}}catch (std::exception& e) {std::cerr << "Exception code : " << e.what() << endl;}}

我们看这个回调也确实是这么做的,有消息就一直发。

结语及总结

疑问

        在本篇博文中,主要梳理了asio多线程网络编程其中一种方案的实现及理论。在写的时候发现了一点问题,就是说server管理了session节点,但是我看析构的时候,并没有将所有节点删除。而是在main函数捕获信号,然后通过io_context池子来stop了所有的io_context,让io_context.run()立即返回,并且回收了所有的线程.但是io_context.run()返回了也就是内部的epoll不在epolll_wait了是否就可可以使管理的session是否不用移除,回头再看一下代码吧。

流程总结

        最后我们来梳理一下他的大致流程。首先main函数中创建了一个上下文io_context,然后把他传递给了server.server主要用来管理session(会话)节点,当新连接来的时候并且接受新的连接把他挂到对应的io_context上,接着触发回调运行session的start函数并且把这个session放到map里面管理起来延长生命周期,并且挂一个accept上去。start之后就会走async_read_some,然后是切包,把切好的包通过逻辑系统放到队列里。由逻辑系统里的注册函数对他处理之后,使用session的send,把他发出去。send有两个部分组成,首先如果消息队列不满,那么直接投递到队列里面去,如果满了,就扔了。如果没有消息就挂一个async_write让他自己一直发直到消息队列没消息了。

结语

        总而言之asio虽然在Linux上走的会是reactor而不是preactor性能会差一些,但是给你造了轮子总比自己设计accept,channel来的好。总要方便一些,以后分享一下reactor不用asio手搓是怎么手搓的吧。不过可能要一些时间了,下一步的计划是把dds,someip,以及车载网络协议更新完。每次手写博客都是对自己知识的梳理,感谢各位看到这里,内容可能有错,批判的看待即可。

相关文章:

linux asio网络编程理论及实现

最近在B站看了恋恋风辰大佬的asio网络编程&#xff0c;质量非常高。在本章中将对ASIO异步网络编程的整体及一些实现细节进行完整的梳理&#xff0c;用于复习与分享。大佬的博客&#xff1a;恋恋风辰官方博客 Preactor/Reactor模式 在网络编程中&#xff0c;通常根据事件处理的触…...

第一篇:数据库基础与概念

第一篇&#xff1a;数据库基础与概念 目标读者&#xff1a; 没有接触过数据库的初学者。 内容概述&#xff1a; 在本篇文章中&#xff0c;我们将从零开始&#xff0c;详细介绍数据库的基本概念、常见的数据库管理系统&#xff08;DBMS&#xff09;以及数据库设计的基础知识…...

从理论到实践:Django 业务日志配置与优化指南

在现代 Web 开发中,日志记录是确保系统可维护性和可观测性的重要手段。通过合理的日志配置,我们可以快速定位问题、分析系统性能,并进行安全审计。本文将围绕 Django 框架,详细介绍如何配置和优化业务日志,确保开发环境和生产环境都能高效地记录和管理日志。 © ivwdc…...

基于Python的药物相互作用预测模型AI构建与优化(上.文字部分)

一、引言 1.1 研究背景与意义 在临床用药过程中,药物相互作用(Drug - Drug Interaction, DDI)是一个不可忽视的重要问题。当患者同时服用两种或两种以上药物时,药物之间可能会发生相互作用,从而改变药物的疗效、增加不良反应的发生风险,甚至危及患者的生命安全。例如,…...

常用的 ASCII 码表字符

ASCII&#xff08;美国信息交换标准代码&#xff0c;American Standard Code for Information Interchange&#xff09;是一种字符编码标准&#xff0c;用于表示英文字符、数字、标点符号以及一些控制字符。ASCII 码表包含 128 个字符&#xff0c;每个字符用 7 位二进制数表示&…...

AI大模型开发原理篇-8:Transformer模型

近几年人工智能之所以能迅猛发展&#xff0c;主要是靠2个核心思想&#xff1a;注意力机制Attention Mechanism 和 Transformer模型。本次来浅谈下Transformer模型。 重要性 Transformer模型在自然语言处理领域具有极其重要的地位&#xff0c;为NLP带来了革命性的突破‌。可以…...

Golang 并发机制-2:Golang Goroutine 和竞争条件

在今天的软件开发中&#xff0c;我们正在使用并发的概念&#xff0c;它允许一次执行多个任务。在Go编程中&#xff0c;理解Go例程是至关重要的。本文试图详细解释什么是例程&#xff0c;它们有多轻&#xff0c;通过简单地使用“go”关键字创建它们&#xff0c;以及可能出现的竞…...

NVLink 拓扑、DGX 硬件渲染图

文章目录 一个 server 固定 8 个GPUP100&#xff08;4个NVL&#xff09;V100&#xff08;6个NVL&#xff09;A100&#xff08;12个NVL&#xff09;H100&#xff08;18个NVL&#xff09;【DGX-2 &#xff1a;2018年发布NVSwitch&#xff0c;实现full-mesh】【NVLink 拓扑&#x…...

Python XML 解析

Python XML 解析 引言 XML(可扩展标记语言)是一种用于存储和传输数据的标记语言。Python 作为一种功能强大的编程语言,拥有多种解析 XML 的库,如 xml.etree.ElementTree 和 lxml。本文将详细介绍 Python 中 XML 解析的方法、技巧和注意事项,帮助您更好地掌握 XML 数据的…...

java求职学习day22

MySQL 基础 &SQL 入门 1. 数据库的基本概念 1.1 什么是数据库 1. 数据库 (DataBase) 就是 存储 和 管理 数据的仓库 2. 其本质是一个文件系统, 还是以文件的方式,将数据保存在电脑上 1.2 为什么使用数据库 数据存储方式的比较 通过上面的比较 , 我们可以看出 , 使…...

stm32教程:EXTI外部中断应用

早上好啊大佬们&#xff0c;上一期我们讲了EXTI外部中断的原理以及基础代码的书写&#xff0c;这一期就来尝试一下用它来写一些有实际效能的工程吧。 这一期里&#xff0c;我用两个案例代码来让大家感受一下外部中断的作用和使用价值。 旋转编码器计数 整体思路讲解 这里&…...

OVS-DPDK

dpdk介绍及应用 DPDK介绍 DPDK&#xff08;Data Plane Development Kit&#xff09;是一组快速处理数据包的开发平台及接口。有intel主导开发&#xff0c;主要基于Linux系统&#xff0c;用于快速数据包处理的函 数库与驱动集合&#xff0c;可以极大提高数据处理性能和吞吐量&…...

快速分析LabVIEW主要特征进行判断

在LabVIEW中&#xff0c;快速分析程序特征进行判断是提升开发效率和减少调试时间的重要技巧。本文将介绍如何高效地识别和分析程序的关键特征&#xff0c;从而帮助开发者在编写和优化程序时做出及时的判断&#xff0c;避免不必要的错误。 ​ 数据流和并行性分析 LabVIEW的图形…...

MySQL数据库(二)

一 DDL (一 数据库操作 1 查询-数据库&#xff08;所有/当前&#xff09; 1 所有数据库&#xff1a; show databases; 2 查询当前数据库&#xff1a; select database(); 2 创建-数据库 可以定义数据库的编码方式 create database if not exists ax1; create database ax2…...

Python 梯度下降法(五):Adam Optimize

文章目录 Python 梯度下降法&#xff08;五&#xff09;&#xff1a;Adam Optimize一、数学原理1.1 介绍1.2 符号说明1.3 实现流程 二、代码实现2.1 函数代码2.2 总代码2.3 遇到的问题2.4 算法优化 三、优缺点3.1 优点3.2 缺点 Python 梯度下降法&#xff08;五&#xff09;&am…...

表格结构标签

<!-- thead表示表格的头部 tbody表示表格的主体 --> <thead></thead> <tbody></tbody> <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content&q…...

gcc和g++的区别以及明明函数有定义为何链接找不到

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…...

Git进阶之旅:tag 标签 IDEA 整合 Git

第一章&#xff1a;tag 标签远程管理 git 标签 tag 管理&#xff1a; 标签有两种&#xff1a; 轻量级标签(lightweight)带有附注标签(annotated) git tag 标签名&#xff1a;创建一个标签git tag 标签名 -m 附注内容 &#xff1a;创建一个附注标签git tag -d 标签名…...

计算机网络一点事(24)

TCP可靠传输&#xff0c;流量控制 可靠传输&#xff1a;每字节对应一个序号 累计确认&#xff1a;收到ack则正确接收 返回ack推迟确认&#xff08;不超过0.5s&#xff09; 两种ack&#xff1a;专门确认&#xff08;只有首部无数据&#xff09; 捎带确认&#xff08;带数据…...

集合的奇妙世界:Python集合的经典、避坑与实战

集合的奇妙世界&#xff1a;Python集合的经典、避坑与实战 内容简介 本系列文章是为 Python3 学习者精心设计的一套全面、实用的学习指南&#xff0c;旨在帮助读者从基础入门到项目实战&#xff0c;全面提升编程能力。文章结构由 5 个版块组成&#xff0c;内容层层递进&#x…...

ubuntu20.04.6下运行VLC-Qt例子simple-player

下载examples-master.zip&#xff08;https://github.com/vlc-qt/examples&#xff09;&#xff0c;编译运行simple-player 参考链接&#xff1a; https://blog.csdn.net/szn1316159505/article/details/143743735 本文运行环境 Qt 5.15.2 Qt creator 5.0.2 主要步骤&#xf…...

Node.js MySQL:深度解析与最佳实践

Node.js MySQL:深度解析与最佳实践 引言 Node.js作为一种流行的JavaScript运行时环境,以其轻量级、高性能和事件驱动模型受到开发者的青睐。MySQL则是一款功能强大的关系型数据库管理系统,广泛应用于各种规模的应用程序中。本文将深入探讨Node.js与MySQL的集成,分析其优势…...

Linux网络 | 网络层IP报文解析、认识网段划分与IP地址

前言&#xff1a;本节内容为网络层。 主要讲解IP协议报文字段以及分离有效载荷。 另外&#xff0c; 本节也会带领友友认识一下IP地址的划分。 那么现在废话不多说&#xff0c; 开始我们的学习吧&#xff01;&#xff01; ps&#xff1a;本节正式进入网络层喽&#xff0c; 友友们…...

项目测试之Postman

文章目录 基础实战进行批量测试并输出报告 基础 实战 进行批量测试并输出报告 参考&#xff1a; https://blog.csdn.net/tyh_keephunger/article/details/109205191 概述 Newman是什么&#xff1f;Newman是Postman的命令行工具&#xff0c;用于执行接口测试集合。操作过程…...

C++——list的了解和使用

目录 引言 forward_list与list 标准库中的list 一、list的常用接口 1.list的迭代器 2.list的初始化 3.list的容量操作 4.list的访问操作 5.list的修改操作 6.list的其他操作 二、list与vector的对比 结束语 引言 本篇博客要介绍的是STL中的list。 求点赞收藏评论…...

MySQL基本架构SQL语句在数据库框架中的执行流程数据库的三范式

MySQL基本架构图&#xff1a; MySQL主要分为Server层和存储引擎层 Server层&#xff1a; 连接器&#xff1a;连接客户端&#xff0c;获取权限&#xff0c;管理连接 查询缓存&#xff08;可选&#xff09;&#xff1a;在执行查询语句之前会先到查询缓存中查看是否执行过这条语…...

(leetcode 213 打家劫舍ii)

代码随想录&#xff1a; 将一个线性数组换成两个线性数组&#xff08;去掉头&#xff0c;去掉尾&#xff09; 分别求两个线性数组的最大值 最后求这两个数组的最大值 代码随想录视频 #include<iostream> #include<vector> #include<algorithm> //nums:2,…...

如何用KushoAI提升API自动化测试效率:AI驱动的革命

在现代软件开发中,API测试已经成为确保系统稳定性和可靠性的关键。然而,传统的API测试往往依赖手动编写测试用例,每次修改API后都需要重新进行测试,这不仅耗时费力,还容易因人为疏忽而出现问题。想象一下,你是否曾因API在生产环境中出现微小错误而彻夜未眠?每次修改API后…...

docker安装nacos2.2.4详解(含:nacos容器启动参数、环境变量、常见问题整理)

一、镜像下载 1、在线下载 在一台能连外网的linux上执行docker镜像拉取命令 docker pull nacos:2.2.4 2、离线包下载 两种方式&#xff1a; 方式一&#xff1a; -&#xff09;在一台能连外网的linux上安装docker执行第一步的命令下载镜像 -&#xff09;导出 # 导出镜像到…...

DBeaver连接MySQL提示Access denied for user ‘‘@‘ip‘ (using password: YES)的解决方法

在使用DBeaver连接MySQL数据库时&#xff0c;如果遇到“Access denied for user ip (using password: YES)”的错误提示&#xff0c;说明用户认证失败。此问题通常与数据库用户权限、配置错误或网络设置有关。本文将详细介绍解决此问题的步骤。 一、检查用户名和密码 首先&am…...

VirtualBox:跨磁盘导入已存的vdi磁盘文件顺便测试冷迁移

目录 1.背景 2.目的 3.步骤 3.1 安装在移动硬盘上 3.2.接管现有主机磁盘上的虚拟机 3.3接管迁移到移动硬盘的虚拟机 4. 结论 1.背景 电脑重新做了系统&#xff0c;然后找不到virtualbox的启动程序了&#xff0c;另外电脑磁盘由于存储了其他文件已经爆红&#xff0c;无法…...

蓝桥杯思维训练营(一)

文章目录 题目总览题目详解翻之一起做很甜的梦 蓝桥杯的前几题用到的算法较少&#xff0c;大部分考察的都是思维能力&#xff0c;方法比较巧妙&#xff0c;所以我们要积累对应的题目&#xff0c;多训练 题目总览 翻之 一起做很甜的梦 题目详解 翻之 思维分析&#xff1a;一开…...

EchoMimicV2的部署使用

最近有一个录课的需要&#xff0c;我不想浪费人力&#xff0c;只想用技术解决。需求很简单&#xff0c;就是用别人现成的录课视频中的形象和声线&#xff0c;再结合我提供的讲稿去生成一个新的录课视频。我觉得应该有现成的技术了&#xff0c;我想要免费大批量生产。最近看到这…...

JVM深入学习(一)

目录 一.JVM概述 1.1 为什么要学jvm&#xff1f; 1.2 jvm的作用 1.3 jvm内部构造 二.JVM类加载 2.1类加载过程 2.2类加载器 2.3类加载器的分类 2.4双亲委派机制 三.运行时数据区 堆空间区域划分&#xff08;堆&#xff09; 为什么分区(代)&#xff1f;&#xff08…...

线段树(Segment Tree)和树状数组

线段树&#xff08;Segment Tree&#xff09;和树状数组 线段树的实现链式&#xff1a;数组实现 解题思路树状数组 线段树是 二叉树结构 的衍生&#xff0c;用于高效解决区间查询和动态修改的问题&#xff0c;其中区间查询的时间复杂度为 O(logN)&#xff0c;动态修改单个元素的…...

Teleporters( Educational Codeforces Round 126 (Rated for Div. 2) )

Teleporters&#xff08; Educational Codeforces Round 126 (Rated for Div. 2) &#xff09; There are n 1 n1 n1 teleporters on a straight line, located in points 0 0 0, a 1 a_1 a1​, a 2 a_2 a2​, a 3 a_3 a3​, …, a n a_n an​. It’s possible to tele…...

JavaScript 注释

JavaScript 注释 引言 JavaScript 注释是编写代码过程中不可或缺的一部分。它们不仅可以提高代码的可读性和可维护性,还能帮助其他开发者(或未来的自己)更好地理解代码的意图。本文将深入探讨 JavaScript 注释的多种类型、使用方法和最佳实践。 一、注释的分类 JavaScri…...

消息队列篇--原理篇--常见消息队列总结(RabbitMQ,Kafka,ActiveMQ,RocketMQ,Pulsar)

1、RabbitMQ 特点&#xff1a; AMQP协议&#xff1a;RabbitMQ是基于AMQP&#xff08;高级消息队列协议&#xff09;构建的&#xff0c;支持多种消息传递模式&#xff0c;如发布/订阅、路由、RPC等。多语言支持&#xff1a;支持多种编程语言的客户端库&#xff0c;包括Java、P…...

AVL搜索树

一、介绍 高度平衡的搜索二叉树&#xff0c;保证每个节点的左右子树高度差不超过1&#xff0c;降低搜索树的高度以提高搜索效率。 通过平衡因子和旋转来保证左右子树高度差不超过1 二、插入节点 1、插入规则 &#xff08;1&#xff09;搜按索树规则插入节点 &#xff08;…...

ELK模块封装starter

文章目录 1.combinations-elk-starter1.目录结构2.log4j2-spring.xml 从环境变量读取host和port3.ELKProperties.java 两个属性4.ELKAutoConfiguration.java 启用配置类5.ELKEnvironmentPreparedListener.java 监听器从application.yml中获取属性值6.spring.factories 注册监听…...

C# 与.NET 日志变革:JSON 让程序“开口说清话”

一、引言&#xff1a;日志新时代的开启 在软件开发的漫长旅程中&#xff0c;日志一直是我们不可或缺的伙伴。它就像是应用程序的 “黑匣子”&#xff0c;默默地记录着程序运行过程中的点点滴滴&#xff0c;为我们在调试、排查问题以及性能优化时提供关键线索。在早期&#xff…...

Ubuntu 系统,如何使用双Titan V跑AI

要在Ubuntu系统中使用双NVIDIA Titan V GPU来运行人工智能任务&#xff0c;你需要确保几个关键组件正确安装和配置。以下是基本步骤&#xff1a; 安装Ubuntu操作系统&#xff1a; 下载最新版本的Ubuntu服务器或桌面版ISO文件。使用工具如Rufus&#xff08;Windows&#xff09;或…...

CSDN的历史

CSDN(中国开发者网络,China Software Developer Network)是中国最具影响力的IT技术社区之一,其历史可追溯至1999年。以下是其发展历程和关键节点: --- **一、创立背景(1999年)** - **创始人**:蒋涛(国内知名技术人,曾参与金山软件早期开发)。 - **初衷**:为国内程…...

使用Pygame制作“贪吃蛇”游戏

贪吃蛇 是一款经典的休闲小游戏&#xff1a;玩家通过操控一条会不断变长的“蛇”在屏幕中移动&#xff0c;去吃随机出现的食物&#xff0c;同时要避免撞到墙壁或自己身体的其他部分。由于其逻辑相对简单&#xff0c;但可玩性和扩展性都不错&#xff0c;非常适合作为新手练习游戏…...

【详细教程】如何在Mac部署Deepseek R1?

DeepSeek是目前最火的国产大模型&#xff0c;官方App用户太多服务经常出现卡顿&#xff0c;部署一个本地DeepSeek R1可以方便使用。 1.系统最低要求 macOS 11 Big Sur 或更新 2.下载ollama https://ollama.com/ 3.安装DeepSeek R1 打开终端 运行命令 ollama run deepseek-…...

Java中的getInterfaces()方法:使用与原理详解

在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一个强大的工具&#xff0c;它允许程序在运行时动态地获取类的信息并操作类的属性和方法。getInterfaces()方法是Java反射API中的一个重要方法&#xff0c;用于获取类或接口直接实现的接口。本文将深入探讨getInt…...

PT站点自动签到

在站点下载一些视频电影资源&#xff0c;站点需要长期维护&#xff0c;每天自动签到。 两种方式&#xff1a; 一、保持浏览器登录状态&#xff0c;打开默认用户文件&#xff0c; 模拟点击签到&#xff08;点击按钮自行设置&#xff1a;根据href名称&#xff09; log日志 首次…...

计算机网络一点事(23)

传输层 端口作用&#xff1a;标识主机特定进程&#xff0c;TCP&#xff0c;UDP协议 端口号分类&#xff1a;服务器&#xff1a;0-1023&#xff0c;熟知 1024-49151 登记 客户端&#xff1a;49152-65535 功能&#xff1a;实现端到端&#xff0c;进程到进程的通信&#xff0c…...

vim操作简要记录

操作容易忘记&#xff0c;记录一下基本使用的 :wq保存退出 :w :q :q! :wq! i I a A 方向键 h左 j下 k上 l右 dd删除方行&#xff08;这其实是剪切行操作&#xff0c;不过一般用作删除&#xff0c;长按可删除&#xff0c;不过按.执行上一次操作删除更快&#xff09; .执行上…...

DeepSeek大模型技术深度解析:揭开Transformer架构的神秘面纱

摘要 DeepSeek大模型由北京深度求索人工智能基础技术研究有限公司开发&#xff0c;基于Transformer架构&#xff0c;具备卓越的自然语言理解和生成能力。该模型能够高效处理智能对话、文本生成和语义理解等复杂任务&#xff0c;标志着人工智能在自然语言处理领域的重大进展。 关…...