【Linux】利用多路转接epoll机制、ET模式,基于Reactor设计模式实现
📚 博主的专栏
🐧 Linux | 🖥️ C++ | 📊 数据结构 | 💡C++ 算法 | 🅒 C 语言 | 🌐 计算机网络
上篇文章:多路转接epoll,实现echoserver
至此,Linux与计算机网络系列专题已圆满收官。后续我们将持续推出项目实战、MySQL数据库、Qt开发以及算法精讲等系列文章,敬请期待!
目录
Connection.hpp
先完成Listen套接字的事件处理:
Common.hpp
Epoller.hpp
Listener.hpp
TcpServer.hpp
Main.cc
Reactor(反应堆)设计模式
处理listen套接字的新连接:如何处理Listen类中的Accepter?
在Connection类中定义一个Reactor指针,并实现Connection和Reactor之间的互相引用
因此我们需要将所有的文件描述符都设置为非阻塞。
解决办法:fcntl
Listener类Listener.hpp:
将新创建的套接字加入Reactor的连接池
1.在Connection类中设置Reactor、以及Connection类型
2.将当前对象,设置进入所有的conn对象中
添加HandlerConnection模块
3.修正AddConnection
更新AddConnection
更新入口函数:Main.cc
继续写HandlerConnection类:读事件处理
4.处理读事件,引入协议进行报文解析
5.协议处理+报文解析模块PackageParse.hpp+Protocol.hpp
读事件处理逻辑图:
6.添加业务处理模块:
7.关于写入的话题:
写事件处理
写事件处理流程
PackageParse.hpp
数据发送HandlerSender函数
更新Epoller.hpp
更新Reactor.hpp
DelConnection函数:
正式认识Reactor反应堆模式
Reactor 模式的核心思想
Reactor 模式的主要组件
Reactor(底层)+ 线程池(业务处理)
编辑Reactor + 线程池的架构
引发竞争条件、数据一致性问题处理
转变思路:One thread One Loop
设计思路
本篇文章主要讲解耦合度超低EchoServer编写,然后结合之前所讲过的网络版计算器搭建业务层
摘要:本文详细探讨如何利用Reactor模式构建低耦合、高并发的网络服务器。通过封装Connection类管理套接字及缓冲区,结合epoll多路复用技术实现事件驱动模型,核心模块包括Listener(处理新连接)、HandlerConnection(处理IO事件)和Reactor(事件派发)。文章深入解析非阻塞IO与边缘触发(ET)模式的高效实现,并通过线程池优化业务处理逻辑,解决多线程环境下的竞争条件与数据一致性问题。此外,引入eventfd实现线程间唤醒机制,验证errno的线程安全性。最终通过模块化设计实现服务器的高扩展性,为开发高性能网络服务提供完整实践方案,适用于HTTP服务器、即时通信等场景。
Request(请求) && Response(应答) Protocol(协议处理)PackageParse(报文解析)
Listener(处理新连接) HandlerConnection(处理普通套接字IO事件以及异常)
Connection(封装套接字以及缓冲区)
Reactor(最底层)
epoller(反应给Reactor)
使用到的代码在之前的博客都有详细讲解,可在我的gitee中自取:点击链接
本篇文章的代码:2.epollserver_v2
本篇文章需要准备以下文件:接下来将详细讲解每个模块的实现以及实现思路
.
├── Common.hpp
├── Connection.hpp
├── Epoller.hpp
├── HandlerConnection.hpp
├── InetAddr.hpp
├── Listener.hpp
├── LockGuard.hpp
├── Log.hpp
├── Main.cc
├── Makefile
├── NetCal.hpp
├── PackageParse.hpp
├── Protocol.hpp
├── Reactor.hpp
└── Socket.hpp0 directories, 15 files
Connection.hpp
封装文件描述符:每个文件描述符(如套接字)被抽象为
Connection
对象,包含:
接收缓冲区(
_inbuffer
)和发送缓冲区(_outbuffer
)。事件处理函数(读、写、异常),通过
RegisterHandler
方法注册。管理文件描述符(
_sockfd
)及其关注的事件(_events
)。
封装文件描述符
1.保证每一个文件描述符都有一个自己的接收缓冲区。
在epoll机制中,文件描述符的概念被抽象化,包括套接字在内的所有通信端点都被统一封装为连接(connection)对象。
#pragma once #include <iostream> #include <string> #include <functional>// 封装文件描述符 // 1.给每个文件描述符做一个设计,保证每一个文件描述符都有一个自己的接收缓冲区, // 之后,在epoll当中,就没有文件描述符的概念,包括套接字,一起被封装成为连接:connectionclass Connection; using handler_t = std::function<void(Connection *conn)>;// 未来我们的服务器,一切都是Connection, 对我们来讲:listensockfd也是一样 class Connection {public:Connection(int sockfd) : _sockfd(sockfd){}// 注册对应的事件处理方法void RegisterHandler(handler_t recver, handler_t sender, handler_t excepter){_handler_recver = recver;_handler_sender = sender;_handler_excepter = excepter;}void SetEvents(uint32_t events){_events = events;}uint32_t Events(){return _events;}int Sockfd(){return _sockfd;}~Connection() {}private:int _sockfd;uint32_t _events; // 所关心的事件std::string _inbuffer; // 使用string来充当缓冲区只能满足此代码的需求(在网络通信中传二进制,就不行)std::string _outbuffer;handler_t _handler_recver; // 处理读取handler_t _handler_sender; // 处理写入handler_t _handler_excepter; // 处理异常 };
TcpServer.hpp
暂时称做TcpServer
使用TcpServer将所有的connections管理起来
key: sockfd
value: Connection*
优势:能够很快的通过文件描述符,来找到connection对象,在未来高效执行connection对应的处理事件的方法
std::unordered_map<int, Connection *> _connections;
先完成Listen套接字的事件处理:
Listener.hpp
用于统一管理listen套接字,以及获取新连接
#pragma once #include <iostream>#include "Socket.hpp"using namespace socket_ns;// 用来统一管理Listen套接字 class Listener { public:Listener(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}int ListenSockfd(){return _listensock->Sockfd();}~Listener() {}private:std::unique_ptr<Socket> _listensock;uint16_t _port; };
Main.cc
在TCP通信中,系统仅需传递一个文件描述符及其对应的事件,即可将其转换为connection对象。服务器通过哈希表对这些connection对象进行统一管理,并交由epoll进行事件监控。我们通过封装connection对象,仅对外暴露必要的功能接口,确保系统的安全性和易用性。
#include <memory>
#include <sys/epoll.h>
#include "Listener.hpp"
#include "Log.hpp"
#include "TcpServer.hpp"using namespace log_ns;// ./tcpserver 8888
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);EnableScreen();Listener listener(port);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>();// 对于一个tcp来讲,只需要传一个文件描述符,以及该文件描述符对应的事件,转换成一个connection对象// 在服务器中以管理哈希表的方式管理,再让epoll来管理// 封装connection对象,仅对外暴露必要的功能接口tsvr->AddConnection(listener.ListenSockfd(), EPOLLIN);return 0;
}
编写服务器中的AddConnection函数:
- 建立连接
- 将连接文件描述符和事件注册到操作系统
- 将连接托管给epoll进行事件监听(封装Epoll-->Epoller.hpp),在服务器这里只有文件描述符和事件的概念
未来可以:
// 计划实现 Select、Poll、Epoll 三种版本
// 设计一个基类 Poller,提供统一的公共接口,包括 control、wait 和 delete 方法
#pragma once// 未来还可以Select、poll、epoll三个版本都实现
// 做一个基类poller,提供公共的接口,暴露出control、wait、delete。
class Multiplex
{
public:
private:
};class Epoller : public Multiplex
{
};class Poller : public Multiplex
{
};class Selector : public Multiplex
{
};
本篇文章仅实现Epoller版本。
Common.hpp
将原来封装在各文件中的枚举常量提取出来(错误码等级),放在一个Common.hpp当中
#pragma onceenum
{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERROR,EPOLL_CREATE_ERROR
};
Epoller.hpp
基于epoll的事件管理:
AddEvent
将文件描述符及其事件注册到epoll实例。
Wait
方法调用epoll_wait
获取就绪事件。设计支持未来扩展
select
和poll
(继承自Multiplex
基类)。
#pragma once
#include <iostream>
#include <stdlib.h>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Common.hpp"
// 未来还可以Select、poll、epoll三个版本都实现
// 做一个基类poller,提供公共的接口,暴露出control、wait、delete。
using namespace log_ns;
static const int gsize = 128;class Multiplex
{
public:virtual ~Multiplex() = default; // 添加虚析构函数virtual bool AddEvent(int fd, u_int32_t events) = 0;virtual int Wait(struct epoll_event revs[], int num, int timeout) = 0; // 临时方案private:
};class Epoller : public Multiplex
{
public:Epoller(){_epfd = ::epoll_create(gsize);if (_epfd < 0){LOG(FATAL, "epoll create error\n");exit(EPOLL_CREATE_ERROR);}LOG(INFO, "epoll create success, epfd: %d", _epfd);}std::string EventsToString(u_int32_t events){std::string eventsstr;if (events & EPOLLIN)eventsstr = "EPOLLIN ";if (events & EPOLLOUT)eventsstr += "| EPOLLOUT";if (events & EPOLLET)eventsstr += "| EPOLLET";return eventsstr;}bool AddEvent(int fd, u_int32_t events) override{struct epoll_event ev;ev.events = events;ev.data.fd = fd;int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);if (n < 0){LOG(ERROR, "epoll_ctl add %d events %s is fail...\n", fd, EventsToString(events).c_str());return false;}LOG(INFO, "epoll_ctl add %d events %s is success...\n", fd, EventsToString(events).c_str());return true;}int Wait(struct epoll_event revs[], int num, int timeout) override{return ::epoll_wait(_epfd, revs, num, timeout);}~Epoller(){}private:int _epfd;
};// class Poller : public Multiplex
// {
// };// class Selector : public Multiplex
// {
// };
Listener.hpp
监听套接字管理:创建并绑定监听套接字,接受新连接(
Accepter
方法)。与TcpServer集成:将监听套接字注册到
TcpServer
中,处理EPOLLIN
事件(新连接到达)。
#pragma once
#include <iostream>#include "Socket.hpp"
#include "Common.hpp"
#include "Connection.hpp"
using namespace socket_ns;// 用来统一管理Listen套接字
class Listener
{
public:Listener(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}int ListenSockfd(){return _listensock->Sockfd();}void Accepter(Connection *conn){LOG(DEBUG, "%d socker ready\n", conn->Sockfd());}~Listener() {}private:std::unique_ptr<Socket> _listensock;uint16_t _port;
};
TcpServer.hpp
统一管理连接:通过哈希表
std::unordered_map<int, Connection*>
快速根据文件描述符查找Connection
对象。事件驱动模型:依赖
Epoller
类实现事件监听,通过epoll_wait
获取就绪事件,并根据事件类型(读/写)派发到对应的Connection
处理函数。服务器启动:通过
Start
方法进入事件循环,处理客户端请求。
#pragma once
#include <iostream>
#include <string>
#include <unordered_map>
#include <memory>
#include "Connection.hpp"
#include "Epoller.hpp"
using namespace log_ns;// 底层是使用epoll,暂时称做TcpServer
class TcpServer
{static const int gnum = 64;public:TcpServer() : _epoller(std::make_unique<Epoller>()), _isrunning(false){}void AddConnection(int fd, uint32_t events, handler_t recver, handler_t sender, handler_t excepter) // 稍后再调整{// 1.构建一个connectionConnection *conn = new Connection(fd);conn->SetEvents(events);// TODO, 设置对connection的上层处理, 即, 如果该connection就绪被激活,该如何处理?conn->RegisterHandler(recver, sender, excepter);// 2.fd和events写透到OS中,托管给epollif (!_epoller->AddEvent(conn->Sockfd(), conn->Events()))return;// 3.托管给_connection_connections.insert(std::make_pair(fd, conn));}// 对于服务器来讲,只知道自己在管理connectionvoid Start() // TODO{int timeout = -1; // 非阻塞_isrunning = true;while (true){// 从epoll获取就绪事件int n = _epoller->Wait(revs, gnum, timeout);// 只有n > 0才会处理,所以不用再判断nfor (int i = 0; i < n; i++){int sockfd = revs[i].data.fd;uint32_t revents = revs[i].events;// 将epoll发生的错误或挂断,让这些事件全部放在读写事件就绪中延迟处理if (revents & EPOLLERR)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLHUP)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLIN) //{if (IsConnectionExist(sockfd) && _connections[sockfd]->_handler_recver){// 读事件就绪,派发给对应的connection_connections[sockfd]->_handler_recver(_connections[sockfd]);}}if (revents & EPOLLOUT){if (IsConnectionExist(sockfd) && _connections[sockfd]->_handler_sender){// 写事件就绪,派发给对应的connection_connections[sockfd]->_handler_sender(_connections[sockfd]);}}}}_isrunning = false;}bool IsConnectionExist(int sockfd){// 判断一个sockfd是否存在于connections(哈希表)当中return _connections.find(sockfd) != _connections.end();}~TcpServer() {}private:// 使用TcpServer将所有的connections管理起来// key: sockfd// value: Connection*// 优势:能够很快的通过文件描述符,来找到connection对象,在未来高效执行connection对应的处理事件的方法std::unordered_map<int, Connection *> _connections;std::unique_ptr<Multiplex> _epoller;// 判断服务器是否开启bool _isrunning;struct epoll_event revs[gnum];
};
Main.cc
初始化服务器:解析命令行参数(端口号),创建
Listener
和TcpServer
实例。注册监听套接字:将监听套接字的事件(
EPOLLIN
)和处理函数(Accepter
)绑定到TcpServer
。启动事件循环:调用
tsvr->Start()
进入epoll事件监听状态。
#include <memory>
#include <sys/epoll.h>
#include "Listener.hpp"
#include "Log.hpp"
#include "TcpServer.hpp"using namespace log_ns;// ./tcpserver 8888
int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);EnableScreen();Listener listener(port);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>();// 对于一个tcp来讲,只需要传一个文件描述符,以及该文件描述符对应的事件,转换成一个connection对象// 在服务器中以管理哈希表的方式管理,再让epoll来管理// 封装connection对象,仅对外暴露必要的功能接口// 处理listen套接字的接口tsvr->AddConnection(listener.ListenSockfd(),EPOLLIN | EPOLLET,std::bind(&Listener::Accepter, &listener, std::placeholders::_1),nullptr,nullptr);// 开启服务器tsvr->Start();return 0;
}
运行结果:
以上代码:
.
├── Common.hpp
├── Connection.hpp
├── Epoller.hpp
├── InetAddr.hpp
├── Listener.hpp
├── LockGuard.hpp
├── Log.hpp
├── Main.cc
├── Makefile
├── Socket.hpp
└── TcpServer.hpp0 directories, 11 files
实现了一个基于epoll的事件驱动TCP服务器,核心功能包括连接管理、事件监听、新连接接受及读写事件处理。通过面向对象设计和模块化封装,提供了高效、可扩展的服务器框架,但需进一步完善异常处理、资源管理和二进制数据传输支持。
我们将TcpServer类更名为Reactor,因此需要将代码中所有TcpServer的引用统一替换为Reactor。
Reactor(反应堆)设计模式
Reactor 是一种用于处理异步事件流的设计模式,广泛应用于现代编程框架和库中,特别是在需要处理高并发、非阻塞 I/O 的场景中。它的核心思想是将事件驱动的编程模型与事件分发机制相结合,通过事件循环(Event Loop)来监听和分发事件,从而实现高效的任务处理。
在Reactor设计模式当中,开启服务器的Start函数更喜欢叫做:事件派发:Dispatcher
void Start()----> Dispatcher() // 改为事件派发
在我们的代码中:Reactor类似一个connection的容器,核心工作就是
1. 管理connection和对应的内核事件
2. 事件派发
优化代码结构:
LoopOnce(int timeout)
函数的作用是 单次处理已就绪的 epoll 事件
// 用于检测获取一次已经就绪的事件void LoopOnce(int timeout){// 从epoll获取就绪事件int n = _epoller->Wait(revs, gnum, timeout);// 只有n > 0才会处理,所以不用再判断nfor (int i = 0; i < n; i++){int sockfd = revs[i].data.fd;uint32_t revents = revs[i].events;// 将epoll发生的错误或挂断,让这些事件全部放在读写事件就绪中延迟处理if (revents & EPOLLERR)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLHUP)revents |= (EPOLLIN | EPOLLOUT);if (revents & EPOLLIN) //{if (IsConnectionExist(sockfd) && _connections[sockfd]->_handler_recver){// 读事件就绪,派发给对应的connection_connections[sockfd]->_handler_recver(_connections[sockfd]);}}if (revents & EPOLLOUT){if (IsConnectionExist(sockfd) && _connections[sockfd]->_handler_sender){// 写事件就绪,派发给对应的connection_connections[sockfd]->_handler_sender(_connections[sockfd]);}}}}// 对于服务器来讲,只知道自己在管理connectionvoid Dispatcher() // 改为事件派发{int timeout = -1; // 非阻塞_isrunning = true;while (true){LoopOnce(timeout);}_isrunning = false;}
处理listen套接字的新连接:如何处理Listen类中的Accepter?
在Connection类中定义一个Reactor指针,并实现Connection和Reactor之间的互相引用
这种设计模式在事件驱动架构中非常常见,网络编程中的Reactor模式。Connection对象代表一个网络连接,而Reactor对象负责监听和处理事件。通过这种互相引用的方式,Connection可以通知Reactor处理事件,而Reactor也可以管理多个Connection的生命周期。
由于采用ET模式,系统不会主动通知新连接的到来。为了确保在非阻塞状态下能够一次性获取所有连接,我们需要通过循环方式持续获取,直到底层不再有新的连接为止。
然而,在最初封装的Socket代码中,连接获取的设计的就是默认的阻塞模式:
int Accepter(InetAddr *clientaddr) override{struct sockaddr_in client;socklen_t len = sizeof(client);// 4.获取新链接// 从监听套接字获取新的套接字、获取客户端信息int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);// 获取连接失败、继续获取if (sockfd < 0){LOG(WARNING, "accept error\n");return -1;}*clientaddr = InetAddr(client);// 获客成功,提供服务return sockfd;}
如果只有一个连接,将该连接获取后,由于我们是循环获取,调Accepter不就阻塞在accept了 ?
因此我们需要将所有的文件描述符都设置为非阻塞。
解决办法:fcntl
在之前的学习中,我们了解到Linux系统提供了一个强大的接口:
fcntl
。这个接口不仅可以获取和设置文件描述符的属性,还能将文件描述符显式设置为非阻塞模式。将该方法,放入Common.hpp中
#include <iostream> #include <unistd.h> #include <fcntl.h> void SetNonBlock(int fd) {int fl = ::fcntl(fd, F_GETFL);if (fl < 0){std::cout << "fcntl error" << std::endl;return;}::fcntl(fd, F_SETFL, fl | O_NONBLOCK); }
保证所有的套接字都是非阻塞:
需要注意的是,将文件描述符设置为非阻塞模式后,在进行I/O操作时可能会立即返回
EAGAIN
或EWOULDBLOCK
错误,这表示当前没有数据可读或写缓冲区已满。在这种情况下,程序应该使用select
、poll
或epoll
等I/O多路复用技术来监控文件描述符的状态,并在适当的时候重试I/O操作。
因此我们这里就直接使用epoll来监控,但是我们必须要获得错误码,因此还需要改造Accepter,在将套接字改为fd之后,获取一下errno:
int Accepter(InetAddr *clientaddr, int *code) override{struct sockaddr_in client;socklen_t len = sizeof(client);// 4.获取新链接// 从监听套接字获取新的套接字、获取客户端信息int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);*code = errno; // 得到错误码SetNonBlock(sockfd);// 获取连接失败、继续获取if (sockfd < 0){// LOG(WARNING, "accept error\n");return -1;}*clientaddr = InetAddr(client);// 获客成功,提供服务return sockfd;}
Listener类Listener.hpp:
#pragma once #include <iostream>#include "Socket.hpp" #include "Common.hpp" #include "Reactor.hpp" #include "Connection.hpp" using namespace socket_ns;// 用来统一管理Listen套接字 class Listener { public:Listener(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}int ListenSockfd(){return _listensock->Sockfd();}void Accepter(Connection *conn){// 由于我们采用的是ET模式,循环获取底层连接(一次性获取完),非阻塞// 但是这里有可能只有一个listen套接字while (true){// 每次循环时,都将errno清0errno = 0;InetAddr addr;int code = 0;int sockfd = _listensock->Accepter(&addr, &code);if (sockfd > 0){// 获取连接成功LOG(INFO, "获取连接成功, 客户端信息:%s:%d, new_sockfd: %d\n", addr.Ip().c_str(), addr.Port(), sockfd);}else{// 失败if (code == EWOULDBLOCK){LOG(INFO, "底层连接全部获取完毕\n");break;}else if (code == EINTR){continue;}else{LOG(ERROR, "获取连接失败!\n");break;}}}}~Listener() {}private:std::unique_ptr<Socket> _listensock;uint16_t _port; };
运行结果:这里我分别使用telnet、浏览器来连接(浏览器会有两个事件就绪,一个是连接就绪也就是读事件就绪,并且浏览器会向服务器发送HTTP请求数据-->TCP三次握手建立连接,随后立即发送HTTP请求头和数据,读事件就绪与Telnet的简单连接行为形成鲜明对比)
以上代码,只处理了listen套接字以及他的读事件(获取连接成功)
我们获得了新的套接字描述符后,为了有效管理这些描述符并确保后续能够以非阻塞方式处理其IO事件,需要先将它们添加至Reactor管理的连接池中。
接下来,我们将编写把新创建的套接字加入Reactor的连接池进行统一管理。
将新创建的套接字加入Reactor的连接池
1.在Connection类中设置Reactor、以及Connection类型
Connection可以通知Reactor处理事件,而Reactor也可以管理多个Connection的生命周期。
void SetReactor(Reactor *R){_R = R;}
并且设置Connection类型:
#define ListenConnection 0
#define NormalConnection 1
2.将当前对象,设置进入所有的conn对象中
在Listen类中:
添加HandlerConnection模块
#pragma once#include <iostream>
#include "Connection.hpp"class HandlerConnection
{
public:void HandlerRecver(Connection *conn){}void HandlerSender(Connection *conn){}void HandlerExcepter(Connection *conn){}
};
3.修正AddConnection
优化Reactor类的AddConnection接口,在Reactor中制作方法集:
// Reactor中添加处理sockfd的方法集// 1.处理新连接到来handler_t _OnConnect;// 2.处理普通sockfd,主要是IO处理handler_t _OnRecver;handler_t _OnSender;handler_t _OnExcept;
这意味着在Reactor中可以初始化整个方法集
void SetOnConnect(handler_t OnConnect){_OnConnect = OnConnect;}void SetOnNormalHandler(handler_t recver, handler_t sender, handler_t excepter){_OnRecver = recver;_OnSender = sender;_OnExcepter = excepter;}
在Connection类中添加:客户端套接字信息
InetAddr _addr; //客户端的套接字信息
设置,并添加获取接口:
void SetAddr(const InetAddr &addr){_addr = addr;}
更新AddConnection
void AddConnection(int fd, uint32_t events, const InetAddr &addr, int type) // 稍后再调整{// 1.构建一个connectionConnection *conn = new Connection(fd);conn->SetEvents(events);conn->SetConnectionType(type);conn->SetAddr(addr);// 将当前对象,设置进入所有的conn对象中conn->SetReactor(this);// TODO, 设置对connection的上层处理, 即, 如果该connection就绪被激活,该如何处理?if (conn->Type() == ListenConnection){conn->RegisterHandler(_OnConnect, nullptr, nullptr);}else{conn->RegisterHandler(_OnRecver, _OnSender, _OnExcepter);}// 2.fd和events写透到OS中,托管给epollif (!_epoller->AddEvent(conn->Sockfd(), conn->Events()))return;// 3.托管给_connection_connections.insert(std::make_pair(fd, conn));}
更新InetAddr类的构造函数:添加一个初始化ip和port的重载
InetAddr(const std::string &ip, uint16_t port){_ip = ip;_port = port;_addr.sin_family = AF_INET;_addr.sin_port = htons(_port);_addr.sin_addr.s_addr = inet_addr(ip.c_str());}
更新入口函数:Main.cc
#include <memory> #include <sys/epoll.h> #include "Listener.hpp" #include "HandlerConnection.hpp" #include "Log.hpp" #include "Reactor.hpp"using namespace log_ns;// ./tcpserver 8888 int main(int argc, char *argv[]) {if (argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);EnableScreen();InetAddr localaddr("0.0.0.0", port);// 专门用于处理新连接的模块Listener listener(port); // 连接管理器// 专门处理普通sockfd的模块HandlerConnection handlers; // IO处理器// 主模块,事件派发std::unique_ptr<Reactor> R = std::make_unique<Reactor>(); // 事件派发器// 模块之间,产生关联R->SetOnConnect(std::bind(&Listener::Accepter, &listener, std::placeholders::_1));R->SetOnNormalHandler(std::bind(&HandlerConnection::HandlerRecver, &handlers, std::placeholders::_1),std::bind(&HandlerConnection::HandlerSender, &handlers, std::placeholders::_1),std::bind(&HandlerConnection::HandlerExcepter, &handlers, std::placeholders::_1));R->AddConnection(listener.ListenSockfd(), EPOLLIN | EPOLLET, localaddr, ListenConnection);// 做事件派发R->Dispatcher();return 0; }
更新Listener类当中的Accepter:
conn->_R->AddConnection(sockfd, EPOLLIN | EPOLLET, addr, NormalConnection); // 需要添加对普通套接字进行IO事件处理的模块
添加日志信息,便于调试代码:
在Reactor当中添加 PrintDebug()用于打印epoll所管理的fd列表
void PrintDebug(){std::string fdlist;for (auto &conn : _connections){fdlist += std::to_string(conn.second->Sockfd()) + " ";}LOG(DEBUG, "epoll管理的fd列表: %s\n", fdlist.c_str());}
并在每一次Loop后打印:
获取客户端连接信息,测试调用HandlerRecver成功
继续写HandlerConnection类:读事件处理
首先先处理接收缓冲区,也就是读事件:
由于不能保证一次性能将客户端发的所有内容读取完毕,因此我们将每一次读取上来的数据块,都拼接到该文件描述符所对应的接收缓冲区inbuffer当中:
在Connection类中添加数据追加接口:
void AppendInbuffer(const std::string &in){_inbuffer += in;}std::string &Inbuffer(){return _inbuffer;}
4.处理读事件,引入协议进行报文解析
HandlerRecver函数
注意:这里保证了,本轮的数据都读取完毕。所有的出错以及异常情况,在后面统一处理,接下来我们先对读取到的数据进行处理
void HandlerRecver(Connection *conn){LOG(DEBUG, "client 给我发了消息: %d\n", conn->Sockfd());while (true){char buffer[buffersize];int n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);if (n > 0) // 说明获取成功{buffer[n] = 0; // 数据块// 将每一次从同一个fd中读取到的buffer拼接起来,临时放在inbuffer里conn->AppendInbuffer(buffer);}else // 出错{// 本轮数据读取完毕if (errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}else{// 最后统一进行异常处理conn->_handler_excepter(conn);return;}}}// 一定读取完毕,处理数据std::cout << "Inbuffer content: " << conn->Inbuffer() << std::endl;}
验证追加成功:
处理数据: 让上层处理报文
现阶段我们的程序架构:
5.协议处理+报文解析模块PackageParse.hpp+Protocol.hpp
Protocol(协议处理)PackageParse(报文解析)
Listener(处理新连接) HandlerConnection(处理普通套接字IO事件以及异常)
Connection(封装套接字以及缓冲区)
Reactor(最底层)
为了保证读取上来的报文是完整的我们需要自定义协议,这篇文章详细讲解了HTTP服务器实现,如何处理报文(序列化、反序列化)在实现一个HTTP服务器时,确保读取的报文完整是至关重要的。由于HTTP协议本身是基于TCP的,而TCP是一个流式协议,它并不保证数据包的边界,因此我们需要自定义协议来处理报文的完整性。
未来将收到的inbuffer全部交给上层协议去处理:
这里需要用到之前写网络计算器时用到的代码:
协议处理:Protocol.hpp、IOService.hpp
更新Service.hpp的名字为:PackageParse.hpp(报文解析模块),并且对该代码需要做一些调整,专门处理conn数据,注意,数据已经读取到我们的服务器了,现在仅需将数据传入,进行解析就可以.
读事件处理逻辑图:
Execute函数是成员函数,有隐藏的this,handler函数类型只有一个参数,所以在bind的时候要显示绑定。如果不显式绑定
this
指针,编译器将无法确定Execute
函数应该操作哪个对象,从而导致错误。因此,在绑定成员函数时,显式传递this
指针是必要的。这里注意一下:有一个小错误:
// 非静态成员函数指针需要使用&类名::成员函数名的形式。
HandlerConnection handlers(std::bind(&PackageParse::Execute, &parse, std::placeholders::_1)); // IO处理器
6.添加业务处理模块:
Request(请求) && Response(应答) Protocol(协议处理)PackageParse(报文解析)
Listener(处理新连接) HandlerConnection(处理普通套接字IO事件以及异常)
Connection(封装套接字以及缓冲区)
Reactor(最底层)
这里需要添加业务处理模块的代码:NetCal.hpp,这是之前所讲解过的网络计算机的编写
这就是我们要处理的业务:
注意:Main.cc更新
继续编写PackageParse,将解析好的报文发回客户端
7.关于写入的话题:
1.多进程多线程中,write更简单
2.多路转接如何正确的write?
a.当我们获取一个全新的sockfd的时候,输入和输出缓冲区默认都是空的。
b.读事件就绪本质:输入缓冲区中有了数据,或者底层有新连接
c.写事件就绪:不是关心发送缓冲区是否有数据,而是关心发送缓冲区中是否还有空间,有空间,发送条件满足,写事件就绪,否则,不就绪
d.把一个sockfd托管给select、poll、epoll,原因是sockfd上的事件没有就绪,还是事件就绪了?没有就绪
结论:
- 默认情况下,读事件处于未就绪状态,因为输入缓冲区暂时没有数据。因此,我们需要将读事件默认添加到epoll中进行监控(EPOLLIN)。
- 对于写事件,默认是就绪的,因为发送缓冲区有可用空间。我们可以直接进行写操作,只有当写条件不满足时,才需要开启对sockfd的EPOLLOUT事件监控。
- 当持续写入导致发送缓冲区已满且仍有数据未发送时,就需要开启EPOLLOUT事件监控。
- 后续处理写事件时,我们会先尝试直接发送数据。如果发送条件不满足,则开启写事件监控,epoll会自动处理剩余数据的发送。
写事件处理
与读事件不同,写事件在默认情况下是就绪的,因为socket的发送缓冲区通常有可用空间。这意味着我们可以直接调用write()或send()等函数进行数据发送,而不需要立即将写事件添加到epoll中。
然而,在网络传输过程中,可能会遇到发送缓冲区已满的情况。这通常发生在以下场景中:
- 网络拥塞导致数据发送速度变慢
- 应用程序发送数据的速度超过了网络传输能力
- 对端接收缓冲区已满,导致本端无法继续发送数据
当发生这种情况时,write()或send()函数会返回EAGAIN或EWOULDBLOCK错误,表示当前无法发送更多数据。此时,我们需要将写事件(EPOLLOUT)添加到epoll中进行监控。epoll会在发送缓冲区有可用空间时通知应用程序,使得应用程序可以继续发送剩余的数据。
写事件处理流程
- 尝试直接发送数据
- 如果发送成功,继续处理后续数据
- 如果发送失败且errno为EAGAIN/EWOULDBLOCK: a. 将EPOLLOUT事件添加到epoll监控中 b. 保存未发送的数据
- 当epoll通知EPOLLOUT事件就绪时: a. 尝试发送之前未发送的数据 b. 如果发送成功,移除EPOLLOUT事件监控 c. 如果再次失败,继续等待下一次通知
将解析好的报文追加到发送缓冲区内:在Connection类中添加
void AppendOutbuffer(const std::string &in){_outbuffer += in;}std::string &Outbuffer(){return _outbuffer;}
在解析报文:
// 7.发回conn->AppendOutbuffer(respjson);//将结果追加到发送缓冲区内
并且:没有报文,直接结束Execute
出循环后:
PackageParse.hpp
#pragma once
#include <iostream>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Protocol.hpp"
#include "Connection.hpp"
#include "NetCal.hpp"using namespace log_ns;class PackageParse
{
public:void Execute(Connection *conn){// 长服务while (true){// 2.报文解析,提取报文和有效载荷,报文就在inbuffer里std::string package = Decode(conn->Inbuffer());// 空字符串代表报文不完整,直接breakif (package.empty())return; // 如果没有报文,直接结束// 反序列化做处理auto req = Factory::BuildRequestDefault();std::cout << "package: \n"<< package << std::endl;// 3.做反序列化req->Deserialize(package);// 4.业务处理auto resp = cal.Calculator(req);// 5.构建序列化应答std::string respjson;// 先序列化//resp->Serialize(&respjson);std::cout << "respjson: \n"<< respjson << std::endl;// 6.添加len长度报头respjson = Encode(respjson);std::cout << "respjson add header done: \n"<< respjson << std::endl;// 7.发回conn->AppendOutbuffer(respjson); // 将结果追加到发送缓冲区内}// 我们已经至少处理了一个请求,同时至少会有一个应答conn->_handler_sender(conn); // 方法1:直接发送数据}private:NetCal cal;
};
数据发送HandlerSender函数
请注意:当outbuffer中无数据,即所有数据被发送完,我们这里的处理方式是直接退出。之后还要修改
void HandlerSender(Connection *conn){// 1.直接写while (true){ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);if (n > 0){// 将有效内容发送成功,清理发送缓冲区已发送的数据大小conn->DiscardOutbuffer(n);// 数据发送完毕if (conn->Outbuffer().empty())return; // 暂时return}else if (n == 0){// 在我们今天的代码不存在,outbuffer为空,写事件没必要处理return;}else{// <0不一定是出错了, 有可能是发送缓冲区写满了if (errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}else{// 发送失败,不能继续发送,进入异常处理逻辑conn->_handler_excepter(conn);}}// 2.代码运行到这里,只能是发送条件不满足,outbuffer已经被写满;但是有可能数据没发送完,因此要开启EPOLLOUTif (!conn->Outbuffer().empty()) // 缓冲区还有数据{// 开启关心写事件, 因此需要再Reactor中添加修改文件描述符的事件的关心}}}
如何开启EPOLLOUT:在Reactor中添加修改文件描述符的事件的关心:
这段代码为指定套接字设置epoll监听事件,根据参数控制读/写事件的监听状态,强制添加写事件(无论参数如何都包含EPOLLET)。
因此在多路转接方案中,除了有新增事件接口AddEvent还要有修改事件的接口ModEvent
更新Epoller.hpp
private:bool ModEventHelper(int fd, u_int32_t events, int oper){struct epoll_event ev;ev.events = events;ev.data.fd = fd;int n = ::epoll_ctl(_epfd, oper, fd, &ev);if (n < 0){LOG(ERROR, "epoll_ctl %d events %s is fail...\n", fd, EventsToString(events).c_str());return false;}LOG(INFO, "epoll_ctl %d events %s is success...\n", fd, EventsToString(events).c_str());return true;}public:bool AddEvent(int fd, u_int32_t events) override{return ModEventHelper(fd, events, EPOLL_CTL_ADD);}bool ModEvent(int fd, uint32_t events) override{return (fd, events, EPOLL_CTL_MOD);}
更新Reactor.hpp
// 开启连接对于读写事件的关心void EnableConnectionReadWrite(int sockfd, bool readable, bool writable){if (!IsConnectionExist(sockfd)){return;}uint32_t events = (readable ? EPOLLIN : 0) | (writable ? EPOLLOUT : 0) | EPOLLET;_connections[sockfd]->SetEvents(events);// 将关心的事件写透内核里,因此使用epoller_epoller->ModEvent(_connections[sockfd]->Sockfd(), _connections[sockfd]->Events());}
在事件驱动编程模型中,当开启写事件关心后,系统会持续监控写操作的状态。如果在写数据的过程中,数据未能一次性全部写入,系统会自动保持写事件的活跃状态,继续尝试写入剩余的数据,直到所有数据都成功写入为止。这一机制确保了数据的完整性和连续性,避免了数据丢失或中断的情况。一旦所有数据都成功写入,系统会关闭写事件,表示当前写操作已经完成。
当数据发送完毕后,缓冲区一定是空的,因此需要取消关心写事件:
以上就完成了发送的处理,即写事件的处理
现在我们就可以来做一次包括网络计算器业务处理的实验,需要用到ClientMain.cc客户端的代码
可以发现,服务器响应不成功,分析问题,是由于,错误码没有重新更新的原因,因为整个代码,都是根据错误码来判定读取和写入是否是成功的。还有个原因是,Excute中的序列化被注释,取消注释,以及当报文为空字符串时应该break,到发送报文出,而不是return直接返回:但是有可能,对方没有一个完整的请求,因此还需要在Execute做判断。
运行结果:
可以发现,在客户端退出时,没有客户退出的信息:也就是各种异常处理,接下来我们就对各种异常进行处理:
由于epoller的对key值的处理一定是sockfd有效的情况下,因此我们需要先删除连接:
void HandlerExcepter(Connection *conn){// 整个代码的所有的异常处理,全在这里处理读写错误,客户端关闭连接// 删除连接conn->_R->DelConnection(conn->Sockfd());}
DelConnection函数:
0. 安全检查
1.在内核中移除对sockfd的关心,epoll当中去掉事件关心
2.关闭sockfd
3.在Reactor中移除对Connection的关心
1.需要在内核中移除对sockfd的关心也就需要将在epoller类当中处理,添加删除fd的接口
bool DelEvent(int fd) override{return 0 == ::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);}
2.关闭套接字描述符:由于套接字描述符是被封装在Connection中的,因此在connection提供接口:
void Close(){if(_sockfd >= 0)::close(_sockfd);}
由于Reactor做事件派发的时候,是读事件,写事件都有可能触发,异常后,读写事件都被设置了,如果读写事件都被触发,在循环处理时间的时候,读写事件都有可能被同时执行。读写事件的时候,将一个文件描述符都触发异常(可能性较低,但是为了鲁棒性,我们在DelConnection做安全检查)
DelConnection
void DelConnection(int sockfd){// 0. 安全检查if (!IsConnectionExist(sockfd)){return;}LOG(INFO, "sockfd quit, 服务器释放所有资源\n", sockfd);// 1.在内核中移除对sockfd的关心(epoll当中去掉事件关心EnableConnectionReadWrite(sockfd, false, false);// 1.1再从epoll中删除掉这个fd_epoller->DelEvent(sockfd);// 2.关闭sockfd_connections[sockfd]->Close();// 3.在Reactor中移除对Connection的关心delete _connections[sockfd];_connections.erase(sockfd);}
运行结果:
正式认识Reactor反应堆模式
未来在工作的时候,Reactor管理的可能不是connection连接,而是Event(也是对fd,IO缓冲区做封装)
Request(请求) && Response(应答) Protocol(协议处理)PackageParse(报文解析) Listener(处理新连接) HandlerConnection(处理普通套接字IO事件以及异常) Connection(封装套接字以及缓冲区) Reactor(最底层) epoller(反应给Reactor)
Reactor就像一个能量池一样,哪一个事件节点就绪了,就激活connection的哪一个节点,触发节点IO操作,一旦操作完成,就交给上层去进行报文解析以及业务处理
Reactor 模式的核心思想
(类似于打地鼠游戏)
游戏机台 = 事件循环(Event Loop)
- 持续扫描所有洞口的状态
- 不需要主动检查每个洞口,而是等待"有地鼠冒出"的事件通知
地鼠冒出 = 事件触发(Event Trigger)
- 每个洞口都是独立的事件源
- 冒出动作会生成待处理事件
锤子击打 = 事件处理(Event Handler)
- 每个处理动作对应特定事件类型
- 处理完成后立即复位准备下次响应
基于事件驱动的网络服务器设计的主流模式(libevent、mudou...库)
Reactor 模式的核心思想是将事件的处理分为两个主要部分:事件分离(Demultiplexing)和事件处理(Handling)。事件分离负责监听多个输入源,并在事件发生时将其分发到对应的事件处理器;事件处理器则负责具体的事件处理逻辑。
Reactor 模式的主要组件
Reactor 模式通常由以下几个组件构成:
- Reactor:负责事件的监听和分发。它通过事件分离器(如
select
、epoll
或kqueue
)监听多个输入源,并在事件发生时将其分发给对应的事件处理器。- 事件处理器(EventHandler):负责处理具体的事件。每个事件处理器通常与一个输入源关联,处理该输入源产生的事件。
- 事件分离器(Demultiplexer):负责监听多个输入源,并在事件发生时通知 Reactor。常见的事件分离器包括
select
、poll
、epoll
等。
是否可以使用Reactor(底层)+ 线程池(业务处理)
Reactor(底层)+ 线程池(业务处理)
对于我们的代码的改造: 将报文解析(业务处理)的控制权交给线程池去处理
Reactor + 线程池的架构
Reactor 和线程池的结合是异步编程框架中的经典设计,其工作流程如下:
- 事件监听:Reactor 监听 I/O 事件(如网络请求、文件读写等)。
- 事件分发:当事件发生时,Reactor 将其分发给对应的处理器。
- 任务提交:处理器将具体的业务逻辑任务提交给线程池。
- 任务执行:线程池中的线程执行任务,并将结果返回给 Reactor。
- 结果处理:Reactor 将结果发送给客户端或进行后续处理。
引发竞争条件、数据一致性问题处理
在多线程环境中,当多个线程同时操作同一个文件描述符(fd)时,可能会引发竞争条件和数据一致性问题。以下是不同线程操作同一个fd的详细场景分析:
Reactor模式中的消息发送:
Reactor模式通常用于事件驱动的编程模型。在Reactor中,一个主线程(或事件循环)负责监听文件描述符上的事件(如可读、可写等),并将事件分发给相应的处理器(Handler)。如果Reactor线程在发送消息时,其他线程也在操作同一个fd,可能会导致数据混乱或丢失。例如,Reactor线程正在向fd写入数据,而另一个线程同时关闭了该fd,可能会导致写入失败或程序崩溃。线程池中的消息发送:
线程池通常用于处理并发任务。当线程池中的多个线程尝试向同一个fd发送消息时,可能会出现竞态条件。例如,线程A正在向fd写入消息,而线程B也尝试向同一个fd写入消息,这可能导致消息内容交错或数据损坏。为了避免这种情况,可以使用锁(如互斥锁)来确保同一时间只有一个线程操作fd。其他线程的IO事件处理:
除了Reactor和线程池,其他线程也可能直接操作fd进行IO操作。例如,一个线程可能正在从fd读取数据,而另一个线程同时关闭了该fd,这会导致读取操作失败。此外,如果多个线程同时调用close(fd)
,可能会导致未定义行为,因为fd可能已经被释放。
在我们的代码中的解决办法:只进行激活对写事件的关心
我只进行激活对写事件的关心,未来所有的IO全由Reactor自动处理,多线程只要负责安全处理请求和应答 :
if (!conn->Outbuffer().empty()){// 方法2:我只进行激活对写事件的关心,未来所有的IO全由Reactor自动处理,多线程只要负责安全处理请求和应答conn->_R->EnableConnectionReadWrite(conn->Sockfd(), true, true);}
在这里只专注于激活写事件(Write Event)的注册与处理,而将所有的I/O操作(包括读、写、连接、关闭等)交由Reactor组件自动管理。
Reactor:检测事件就绪、IO处理----------------------》半同步,半异步模式、linux服务器最常见的模式多线程:业务处理
具体来说,Reactor会通过事件循环(Event Loop)持续监听文件描述符(File Descriptor)上的事件就绪,当检测到可读、可写等I/O事件时,Reactor会调用预先注册的回调函数进行处理。对于写事件,我只需在需要发送数据时,通过Reactor的API注册写事件,Reactor会在合适的时机(如缓冲区可写时)触发回调函数,完成数据的发送。
在多线程环境下,线程的主要职责是安全地处理请求和生成应答(业务处理),而无需直接参与I/O操作。每个线程可以通过线程安全的队列或其他同步机制,将待发送的数据传递给Reactor,由Reactor统一处理。这种设计不仅简化了线程的职责,还避免了多线程直接操作I/O资源可能导致的竞争条件和性能瓶颈。
但是代码会十分不优雅。
转变思路:One thread One Loop
一个Reactor做所有的事,再有一个单独Reactor用于连接事件或新的sockfd的派发任务派发给Reactor(多进程or多线程),未来可能有40000个fd分别派发给Reactor(多进程or多线程),来并发处理。将事件处理任务分散到多个Reactor中,每个Reactor运行在独立的线程或进程中,从而实现更高的并发处理能力。
设计思路
主Reactor:负责监听和接受新的连接请求。当有新的连接建立时,主Reactor会将新连接的socket文件描述符(sockfd)分配给一个子Reactor。主Reactor通常运行在单独的线程中,专注于处理连接事件,确保连接的快速响应。
子Reactor:负责处理已建立连接的I/O事件。每个子Reactor运行在独立的线程或进程中,拥有自己的事件循环。子Reactor的数量可以根据系统的CPU核心数或预期的并发连接数进行动态调整。例如,如果系统有8个CPU核心,可以创建8个子Reactor,每个Reactor处理大约5000个连接,从而充分利用多核处理器的计算能力。
任务派发:主Reactor在接收到新的连接后,会根据一定的策略(如轮询、哈希等)将sockfd分配给一个子Reactor。子Reactor在接收到新的sockfd后,会将其注册到自己的事件循环中,并开始监听该连接的读写事件。
并发处理:每个子Reactor独立处理其负责的连接,互不干扰。这种设计可以显著提高系统的并发处理能力,尤其是在面对大量并发连接时。例如,当系统需要处理40000个并发连接时,可以将这些连接均匀地分配给多个子Reactor,每个Reactor处理一部分连接,从而避免单个事件循环的过载。
多进程:
多线程:
version1(不好)
version(2)
version(3)
eventfd
这段代码展示了Linux系统中使用eventfd实现的线程间唤醒机制。
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <sys/eventfd.h>int evfd = -1;// 唤醒机制是基于文件描述符的.
void *wait(void *args)
{std::string name = (const char *)args;while (true){uint64_t flag = 0; // 必须是8字节的数据ssize_t n = ::read(evfd, &flag, sizeof(flag));std::cout << name << " 被唤醒..., flag: %d: " << flag << ", errno is : " << errno << std::endl;}
}void *wakeup(void *args)
{std::string name = (const char *)args;while (true){errno++; // 修改errnosleep(1);std::cout << "wake up one thread, errno : " << errno << std::endl;uint64_t flag = 1;::write(evfd, &flag, sizeof(flag));}
}int main()
{evfd = ::eventfd(0, 0);pthread_t tid1, tid2;pthread_create(&tid1, nullptr, wait, (void *)"thread-1");pthread_create(&tid2, nullptr, wakeup, (void *)"thread-2");pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);::close(evfd);return 0;
}
errno特性:
- 每个线程维护独立的errno副本
- 系统调用成功时不会修改errno
- 示例中errno++仅影响当前线程的errno值
相关文章:
【Linux】利用多路转接epoll机制、ET模式,基于Reactor设计模式实现
📚 博主的专栏 🐧 Linux | 🖥️ C | 📊 数据结构 | 💡C 算法 | 🅒 C 语言 | 🌐 计算机网络 上篇文章:多路转接epoll,实现echoserver 至此,Linux与…...
c/c++的findcontours崩溃解决方案
解决 Windows 平台 OpenCV findContours 崩溃:一种更稳定的方法 许多在 Windows 平台上使用 OpenCV 的开发者可能会在使用 findContours 函数时,遇到令人头疼的程序崩溃问题。尽管网络上流传着多种解决方案,但它们并非总能根治此问题。 当时…...
机器学习 Day18 Support Vector Machine ——最优美的机器学习算法
1.问题导入: 2.SVM定义和一些最优化理论 2.1SVM中的定义 2.1.1 定义 SVM 定义:SVM(Support Vector Machine,支持向量机)核心是寻找超平面将样本分成两类且间隔最大 。它功能多样,可用于线性或非线性分类…...
npm与pnpm--为什么推荐pnpm
包管理器中 npm是最经典的,但大家都任意忽略一个更优质的管理器:pnpm 1. 核心区别 特性npmpnpm依赖存储方式扁平化结构(可能重复依赖)硬链接 符号链接(共享依赖,节省空间)安装速度较慢&#…...
ollama调用千问2.5-vl视频图片UI界面小程序分享
1、问题描述: ollama调用千问2.5-vl视频图片内容,通常用命令行工具不方便,于是做了一个python UI界面与大家分享。需要提前安装ollama,并下载千问qwen2.5vl:7b 模型,在ollama官网即可下载。 (8G-6G 显卡可…...
济南国网数字化培训班学习笔记-第三组-1-电力通信传输网认知
电力通信传输网认知 电力通信基本情况 传输介质 传输介质类型(导引与非导引) 导引传输介质,如电缆、光纤; 非导引传输介质,如无线电波; 传输介质的选择影响信号传输质量 信号传输模式(单工…...
Kubernetes控制平面组件:Kubelet详解(六):pod sandbox(pause)容器
云原生学习路线导航页(持续更新中) kubernetes学习系列快捷链接 Kubernetes架构原则和对象设计(一)Kubernetes架构原则和对象设计(二)Kubernetes架构原则和对象设计(三)Kubernetes控…...
51单片机,两路倒计时,LCD1602 ,Proteus仿真
初始上电 默认2路都是0分钟的倒计时 8个按键 4个一组 一组控制一路倒计时 4个 按键:加 减 开始或者暂停 复位到0分钟相当于停止 针对第一路倒计时 4个 按键2:加 减 开始或者暂停 复位到0分钟相当于停止 针对第2路倒计时 哪一路到了0后蜂鸣器响 对应LED点亮 main.c 文件实现了…...
MySQL之储存引擎和视图
一、储存引擎 基本介绍: 1、MySQL的表类型由储存引擎(Storage Engines)决定,主要包括MyISAM、innoDB、Memory等。 2、MySQL数据表主要支持六种类型,分别是:CSV、Memory、ARCHIVE、MRG_MYISAN、MYISAM、InnoBDB。 3、这六种又分…...
写spark程序数据计算( 数据库的计算,求和,汇总之类的)连接mysql数据库,写入计算结果
1. 添加依赖 在项目的 pom.xml(Maven)中添加以下依赖: xml <!-- Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.0…...
一:操作系统之系统调用
系统调用:用户程序与操作系统交互的桥梁 在计算机的世界里,应用程序是我们日常接触最多的部分,比如浏览器、文本编辑器、游戏等等。然而,这些应用程序并不能直接控制硬件资源,比如读写硬盘、创建新进程、发送网络数据…...
【ROS2】 核心概念6——通信接口语法(Interfaces)
古月21讲/2.6_通信接口 官方文档:Interfaces — ROS 2 Documentation: Humble documentation 官方接口代码实战:https://docs.ros.org/en/humble/Tutorials/Beginner-Client-Libraries/Single-Package-Define-And-Use-Interface.html ROS 2使用简化的描…...
SmartETL函数式组件的设计与应用
SmartETL框架主要采用了面向对象的设计思想,将ETL过程中的处理逻辑抽象为Loader和Processor(对应loader模块和iterator模块),所有流程组件需要继承或实现DataProvider(iter方法)或JsonIterator(…...
Spring Security与SaToken的对比与优缺点分析
Spring Security与SaToken对比分析 一、框架定位 Spring Security 企业级安全解决方案,深度集成Spring生态提供完整的安全控制链(认证、授权、会话管理、攻击防护)适合中大型分布式系统 SaToken 轻量级权限认证框架,专注Token会…...
|从零开始的Pyside2界面编程| 环境搭建以及第一个ui界面
🐑 |从零开始的Pyside2界面编程| 环境搭建以及第一个ui界面🐑 文章目录 🐑 |从零开始的Pyside2界面编程| 环境搭建以及第一个ui界面🐑♈前言♈♈Pyside2环境搭建♈♈做个简单的UI界面♈♒代码实现♒♒QTdesigner设计UI界面♒ ♒总…...
【爬虫】DrissionPage-7
官方文档: https://www.drissionpage.cn/browser_control/get_page_info/ 1. 页面信息 📌 html 描述:返回当前页面的 HTML 文本。注意:不包含 <iframe> 元素的内容。返回类型:str 示例: html_co…...
系统架构设计(十二):统一过程模型(RUP)
简介 RUP 是由 IBM Rational 公司提出的一种 面向对象的软件工程过程模型,以 UML 为建模语言,是一种 以用例为驱动、以架构为中心、迭代式、增量开发的过程模型。 三大特征 特征说明以用例为驱动(Use Case Driven)需求分析和测…...
深入解析Java事件监听机制与应用
Java事件监听机制详解 一、事件监听模型组成 事件源(Event Source) 产生事件的对象(如按钮、文本框等组件) 事件对象(Event Object) 封装事件信息的对象(如ActionEvent包含事件源信息…...
QT聊天项目DAY11
1. 验证码服务 1.1 用npm安装redis npm install redis 1.2 修改config.json配置文件 1.3 新建redis.js const config_module require(./config) const Redis require("ioredis");// 创建Redis客户端实例 const RedisCli new Redis({host: config_module.redis_…...
Python训练营---Day29
知识点回顾 类的装饰器装饰器思想的进一步理解:外部修改、动态类方法的定义:内部定义和外部定义 作业:复习类和函数的知识点,写下自己过去29天的学习心得,如对函数和类的理解,对python这门工具的理解等&…...
Flask-SQLAlchemy_数据库配置
1、基本概念(SQLAlchemy与Flask-SQLAlchemy) SQLAlchemy 是 Python 生态中最具影响力的 ORM(对象关系映射)库,其设计理念强调 “框架无关性”,支持在各类 Python 项目中独立使用,包括 Flask、D…...
世界银行数字经济指标(1990-2022年)-社科数据
世界银行数字经济指标(1990-2022年)-社科数据https://download.csdn.net/download/paofuluolijiang/90623839 https://download.csdn.net/download/paofuluolijiang/90623839 此数据集涵盖了1990年至2022年间全球各国的数字经济核心指标,数据…...
Redis进阶知识
Redis 1.事务2. 主从复制2.1 如何启动多个Redis服务器2.2 监控主从节点的状态2.3 断开主从复制关系2.4 额外注意2.5拓扑结构2.6 复制过程2.6.1 数据同步 3.哨兵选举原理注意事项 4.集群4.1 数据分片算法4.2 故障检测 5. 缓存5.1 缓存问题 6. 分布式锁 1.事务 Redis的事务只能保…...
NY337NY340美光固态颗粒NC010NC012
NY337NY340美光固态颗粒NC010NC012 在存储技术的浩瀚星空中,美光的NY337、NY340、NC010、NC012等固态颗粒宛如璀璨星辰,闪耀着独特的光芒。它们承载着先进技术与无限潜力,正深刻影响着存储行业的格局与发展。 一、技术架构与核心优势 美光…...
DAY26 函数定义与参数
浙大疏锦行-CSDN博客 知识点回顾: 1.函数的定义 2.变量作用域:局部变量和全局变量 3.函数的参数类型:位置参数、默认参数、不定参数 4.传递参数的手段:关键词参数 5.传递参数的顺序:同时出现三种参数类型时 函数的定义…...
系统安全及应用
目录 一、账号安全控制 1.基本安全措施 (1)系统账号清理 (2)密码安全控制 (3)历史命令,自动注销 2.用户提权和切换命令 2.1 su命令用法 2.2 sudo命令提权 2.3通过是sudo执行特权命令 二、系统引导和登录控制…...
微信小程序 地图 使用 射线法 判断目标点是否在多边形内部(可用于判断当前位置是否在某个区域内部)
目录 射线法原理简要逻辑代码 小程序代码调试基础库小程序配置地图数据地图多边形点与多边形关系 射线法 原理 使用射线法来判断,目标点是否在多边形内部 这里简单说下,具体细节可以看这篇文章 平面几何:判断点是否在多边形内(…...
第三十七节:视频处理-视频读取与处理
引言:解码视觉世界的动态密码 在数字化浪潮席卷全球的今天,视频已成为信息传递的主要载体。从短视频平台的爆火到自动驾驶的视觉感知,视频处理技术正在重塑人类与数字世界的交互方式。本指南将深入探讨视频处理的核心技术,通过Python与OpenCV的实战演示,为您揭开动态影像…...
什么是 Flink Pattern
在 Apache Flink 中,Pattern 是 Flink CEP(Complex Event Processing)模块 的核心概念之一。它用于定义你希望从数据流中检测出的 事件序列模式(Event Sequence Pattern)。 🎯 一、什么是 Flink Pattern&am…...
ADB基本操作和命令
1.ADB的含义 adb 命令是 Android 官方提供,调试 Android 系统的工具。 adb 全称为 Android Debug Bridge(Android 调试桥),是 Android SDK 中提供的用于管理 Android 模拟器或真机的工具。 adb 是一种功能强大的命令行工具&#x…...
NSString的三种实现方式
oc里的NSString有三种实现方式,为_ _NSCFConstantString、__NSCFString、NSTaggedPointerString 1._ _NSCFConstantString(字面量字符串) 从字面意思上可以看出,_ _NSCFContantString可以理解为常量字符串,这种类型的字符串在编译期就确定了…...
2025年PMP 学习二十 第13章 项目相关方管理
第13章 项目相关方管理 序号过程过程组过程组1识别相关方启动2规划相关方管理规划3管理相关方参与与执行4监控相关方参与与监控 相关方管理,针对于团队之外的相关方的,核心目标是让对方为了支持项目,以达到项目目标。 文章目录 第13章 项目相…...
学习黑客Kerberos深入浅出:安全王国的门票系统
Kerberos深入浅出:安全王国的门票系统 🎫 作者: 海尔辛 | 发布时间: 2025-05-18 🔑 理解Kerberos:为什么它如此重要? Kerberos是现代网络环境中最广泛使用的身份验证协议之一,尤其在Windows Active Dire…...
蓝桥杯19681 01背包
问题描述 有 N 件物品和一个体积为 M 的背包。第 i 个物品的体积为 vi,价值为 wi。每件物品只能使用一次。 请问可以通过什么样的方式选择物品,使得物品总体积不超过 M 的情况下总价值最大,输出这个最大价值即可。 输入格式 第一行输…...
使用 Auto-Keras 进行自动化机器学习
使用 Auto-Keras 进行自动化机器学习 了解自动化机器学习以及如何使用 auto-keras 完成它。如今,机器学习并不是一个非常罕见的术语,因为像 DataCamp、Coursera、Udacity 等组织一直在努力提高他们的效率和灵活性,以便将机器学习的教育带给普…...
算法刷题Day9 5.18:leetcode定长滑动窗口3道题,结束定长滑动窗口,用时1h
12. 1852.每个子数组的数字种类数 1852. 每个子数组的数字种类数 - 力扣(LeetCode) 思想 找到nums 所有 长度为 k 的子数组中 不同 元素的数量。 返回一个数组 ans,其中 ans[i] 是对于每个索引 0 < i < n - k,nums[i..(i …...
Protect Your Digital Privacy: Obfuscate, Don’t Hide
Protect Your Digital Privacy: Obfuscate, Don’t Hide In today’s digital world, hiding completely online is nearly impossible. But you can protect yourself by deliberately obfuscating your personal information — making it harder for others to track, pro…...
Spark 的运行模式(--master) 和 部署方式(--deploy-mode)
Spark 的 运行模式(--master) 和 部署方式(--deploy-mode),两者的核心区别在于 资源调度范围 和 Driver 进程的位置。 一、核心概念对比 维度--master(运行模式)--deploy-mode(部署…...
从零开始实现大语言模型(十五):并行计算与分布式机器学习
1. 前言 并行计算与分布式机器学习是一种使用多机多卡加速大规模深度神经网络训练过程,以减少训练时间的方法。在工业界的训练大语言模型实践中,通常会使用并行计算与分布式机器学习方法来减少训练大语言模型所需的钟表时间。 本文介绍PyTorch中的一种…...
生产模式下react项目报错minified react error #130的问题
这天,线上突然出现了一个bug,某个页面打开空白,看控制台报错minified react error #130,在本地看却是正常的,百思不得其解。 后来发现是由于线上项目它的包更新过了,而我本地的包没有更新,所以我…...
本地无损放大软件-realesrgan-gui
—————【下 载 地 址】——————— 【本章下载一】:https://drive.uc.cn/s/84516041df174 【本章下载二】:https://pan.xunlei.com/s/VOQDybD4ruF0-m8UJrCF-HtLA1?pwdxz9e# 【百款黑科技】:https://ucnygalh6wle.feishu.cn/wiki/…...
Java面试深度解析:微服务与云原生技术应用场景详解
Java面试深度解析:微服务与云原生技术应用场景详解 面试场景 面试官:我们今天的面试会围绕微服务与云原生技术展开,结合一个在线教育平台的业务场景进行提问。希望你放松心态,正常发挥。 码农明哥:好的好的…...
短剧小程序系统开发源码上架,短剧项目市场分析
引言 随着短视频内容消费的爆发式增长,短剧小程序凭借其碎片化、强互动、低成本的特点,成为内容创业与资本布局的新风口。2024年以来,行业规模突破500亿元,预计2027年将超千亿17。本文将深度解析短剧小程序系统开发的技术优势、市…...
常见的请求头(Request Header)参数
1. Accept 作用:告知服务器客户端支持的响应数据格式(如 JSON、XML、HTML)。示例:Accept: application/json(优先接收 JSON 格式数据)。 2. Content-Type 作用:说明请求体的数据格式(…...
渗透测试核心技术:内网渗透与横向移动
内网渗透是红队行动的关键阶段,攻击者通过突破边界进入内网后,需快速定位域控、横向移动并维持权限。本节从内网环境搭建、信息收集、横向移动技巧到权限维持工具,系统讲解如何在内网中隐蔽行动并扩大战果。 1. 内网环境搭建与基础配置 目标: 模拟真实企业网络,构建包含…...
2025/5/18
继续研究一下大佬的RAG项目。开始我的碎碎念。 RAG可以分成两部分:一个是问答,一个是数据处理。 问答是人提问,然后查数据库,把查的东西用大模型组织成人话,回答人的提问。 数据处理是把当下知识库里的东西…...
使用国内源加速Qt在线安装
简介: 在线安装Qt时,会发现下载非常缓慢,可以用过使用国内镜像源来加速安装过程。 在线安装包的下载过程: 1,打开下载页面 https://www.qt.io/download-open-source 2,点击 Download the Qt online ins…...
【图像生成大模型】HunyuanVideo:大规模视频生成模型的系统性框架
HunyuanVideo:大规模视频生成模型的系统性框架 引言HunyuanVideo 项目概述核心技术1. 统一的图像和视频生成架构2. 多模态大语言模型(MLLM)文本编码器3. 3D VAE4. 提示重写(Prompt Rewrite) 项目运行方式与执行步骤1. …...
Java IO流(超详细!!!)
Java IO流 文章目录 Java IO流1.文件相关基础普及1.1 常用文件操作1.3 目录的操作和文件删除 2.IO流原理及流的分类2.1 字节流2.1.1 InputStream:字节输入流2.1.2 OutputStream 2.2 字符流2.2.1 Reader2.2.1 Writer 2.3 节点流和处理流2.3.1节点流2.3.2 处理流2.3.2…...
规则联动引擎GoRules初探
背景说明 嵌入式设备随着物联网在生活和生产中不断渗透而渐渐多起来,数据的采集、处理、分析在设备侧的自定义配置越来越重要。一个可通过图形化配置的数据处理过程,对于加速嵌入式设备的功能开发愈发重要。作为一个嵌入式软件从业者,笔者一…...