C++负载均衡远程调用学习之Reactor事件触发机制
目录
1.LARV0.2-REACTOR_BUF实现
2.LARV0.2-outpu_buf实现
3.LARV0.2-reactor继承内存管理
4.LARV0.2流程总结
5.LARV0.3-多路IO事件的分析
6.LARV0.3_io_event和event_loop定义
7.LARV0.3_event_loop添加一个事件
8.LARV0.3_event_loop的epoll_wait封装
9.LARV0.3-event_loop删除事件
10.LARV0.3_event_loop添加listenfd读事件
11.LARV0.3-event_loop添加写事件-基于v0.3开发
12.LARV0.3总结
1.LARV0.2-REACTOR_BUF实现
# 三、Lars-Reactor服务框架开发
## 1) 框架结构

## 2) Lars Reactor V0.1开发
我们首先先完成一个最基本的服务器开发模型,封装一个`tcp_server`类。
> lars_reactor/include/tcp_server.h
```cpp
#pragma once
#include <netinet/in.h>
class tcp_server
{
public:
//server的构造函数
tcp_server(const char *ip, uint16_t port);
//开始提供创建链接服务
void do_accept();
//链接对象释放的析构
~tcp_server();
private:
int _sockfd; //套接字
struct sockaddr_in _connaddr; //客户端链接地址
socklen_t _addrlen; //客户端链接地址长度
};
```
在tcp_server.cpp中完成基本的功能实现,我们在构造函数里将基本的socket创建服务器编程写完,然后提供一个阻塞的do_accept()方法。
> lars_reactor/src/tcp_server.cpp
```cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
//server的构造函数
tcp_server::tcp_server(const char *ip, uint16_t port)
{
bzero(&_connaddr, sizeof(_connaddr));
//忽略一些信号 SIGHUP, SIGPIPE
//SIGPIPE:如果客户端关闭,服务端再次write就会产生
//SIGHUP:如果terminal关闭,会给当前进程发送该信号
if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGHUP\n");
}
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
fprintf(stderr, "signal ignore SIGPIPE\n");
}
//1. 创建socket
_sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);
if (_sockfd == -1) {
fprintf(stderr, "tcp_server::socket()\n");
exit(1);
}
//2 初始化地址
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_aton(ip, &server_addr.sin_addr);
server_addr.sin_port = htons(port);
//2-1可以多次监听,设置REUSE属性
int op = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {
fprintf(stderr, "setsocketopt SO_REUSEADDR\n");
}
2.LARV0.2-outpu_buf实现
//3 绑定端口
if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
fprintf(stderr, "bind error\n");
exit(1);
}
//4 监听ip端口
if (listen(_sockfd, 500) == -1) {
fprintf(stderr, "listen error\n");
exit(1);
}
}
//开始提供创建链接服务
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept与客户端创建链接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立链接过多,资源不够
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
//TODO 添加心跳机制
//TODO 消息队列机制
int writed;
char *data = "hello Lars\n";
do {
writed = write(connfd, data, strlen(data)+1);
} while (writed == -1 && errno == EINTR);
if (writed > 0) {
//succ
printf("write succ!\n");
}
if (writed == -1 && errno == EAGAIN) {
writed = 0; //不是错误,仅返回0表示此时不可继续写
}
}
}
}
//链接对象释放的析构
tcp_server::~tcp_server()
{
close(_sockfd);
}
```
好了,现在回到`lars_reactor`目录下进行编译。
3.LARV0.2-reactor继承内存管理
```bash
$~/Lars/lars_reactor/
$make
```
在`lib`下,得到了库文件。
接下来,做一下测试,写一个简单的服务器应用.
```bash
$cd ~/Lars/lars_reactor/example
$mkdir lars_reactor_0.1
$cd lars_reactor_0.1
```
> lars_reactor/example/lars_reactor_0.1/Makefile
```makefile
CXX=g++
CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated
INC=-I../../include
LIB=-L../../lib -llreactor
OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
all:
$(CXX) -o lars_reactor $(CFLAGS) lars_reactor.cpp $(INC) $(LIB)
clean:
-rm -f *.o lars_reactor
```
>lars_reactor/example/lars_reactor_0.1/lars_reactor.cpp
```cpp
#include "tcp_server.h"
int main() {
tcp_server server("127.0.0.1", 7777);
server.do_accept();
return 0;
}
```
接下来,我们make进行编译,编译的时候会指定链接我们刚才生成的liblreactor.a库。
服务端:
```bash
$ ./lars_reactor
begin accept
```
客户端:
```bash
$nc 127.0.0.1 7777
hello Lars
```
得到了服务器返回的结果,那么我们最开始的0.1版本就已经搭建完了,但是实际上这并不是一个并发服务器,万里长征才刚刚开始而已。
4.LARV0.2流程总结
## 3) 内存管理与buffer封装
在完成网络框架之前,我们先把必须的内存管理和buffer的封装完成。
这里我们先创建一个`io_buf`类,主要用来封装基本的buffer结构。然后用一个`buf_pool`来管理全部的buffer集合。
### 3.1 io_buf 内存块
> lars_reactor/include/io_buf.h
```h
#pragma once
/*
定义一个 buffer存放数据的结构
* */
class io_buf {
public:
//构造,创建一个io_buf对象
io_buf(int size);
//清空数据
void clear();
//将已经处理过的数据,清空,将未处理的数据提前至数据首地址
void adjust();
//将其他io_buf对象数据考本到自己中
void copy(const io_buf *other);
//处理长度为len的数据,移动head和修正length
void pop(int len);
//如果存在多个buffer,是采用链表的形式链接起来
io_buf *next;
//当前buffer的缓存容量大小
int capacity;
//当前buffer有效数据长度
int length;
//未处理数据的头部位置索引
int head;
//当前io_buf所保存的数据地址
char *data;
};
```
对应的`io_buf`实现的文件,如下
> lars_reactor/src/io_buf.cpp
```cpp
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include "io_buf.h"
//构造,创建一个io_buf对象
io_buf::io_buf(int size):
capacity(size),
length(0),
head(0),
next(NULL)
{
data = new char[size];
assert(data);
}
//清空数据
void io_buf::clear() {
length = head = 0;
}
//将已经处理过的数据,清空,将未处理的数据提前至数据首地址
void io_buf::adjust() {
if (head != 0) {
if (length != 0) {
memmove(data, data+head, length);
}
head = 0;
}
}
5.LARV0.3-多路IO事件的分析
//将其他io_buf对象数据考本到自己中
void io_buf::copy(const io_buf *other) {
memcpy(data, other->data + other->head, other->length);
head = 0;
length = other->length;
}
//处理长度为len的数据,移动head和修正length
void io_buf::pop(int len) {
length -= len;
head += len;
}
```
这里主要要注意io_buf的两个索引值length和head,一个是当前buffer的有效内存长度,haed则为可用的有效长度首数据位置。 capacity是io_buf的总容量空间大小。
所以每次`pop()`则是弹出已经处理了多少,那么buffer剩下的内存就接下来需要处理的。
然而`adjust()`则是从新重置io_buf,将所有数据都重新变成未处理状态。
`clear()`则是将length和head清0,这里没有提供`delete`真是删除物理内存的方法,因为这里的buffer设计是不需要清理的,接下来是用一个`buf_pool`来管理全部未被使用的`io_buf`集合。而且`buf_pool`的管理的内存是程序开始预开辟的,不会做清理工作.
### 3.2 buf_pool 内存池
接下来我们看看内存池的设计.
> lars_reactor/include/buf_pool.h
```h
#pragma once
#include <ext/hash_map>
#include "io_buf.h"
typedef __gnu_cxx::hash_map<int, io_buf*> pool_t;
enum MEM_CAP {
m4K = 4096,
m16K = 16384,
m64K = 65536,
m256K = 262144,
m1M = 1048576,
m4M = 4194304,
m8M = 8388608
};
//总内存池最大限制 单位是Kb 所以目前限制是 5GB
#define EXTRA_MEM_LIMIT (5U *1024 *1024)
/*
* 定义buf内存池
* 设计为单例
* */
class buf_pool
{
public:
//初始化单例对象
static void init() {
//创建单例
_instance = new buf_pool();
}
//获取单例方法
static buf_pool *instance() {
//保证init方法在这个进程执行中 只被执行一次
pthread_once(&_once, init);
return _instance;
}
//开辟一个io_buf
io_buf *alloc_buf(int N);
io_buf *alloc_buf() { return alloc_buf(m4K); }
//重置一个io_buf
void revert(io_buf *buffer);
private:
buf_pool();
//拷贝构造私有化
buf_pool(const buf_pool&);
const buf_pool& operator=(const buf_pool&);
//所有buffer的一个map集合句柄
pool_t _pool;
//总buffer池的内存大小 单位为KB
uint64_t _total_mem;
//单例对象
static buf_pool *_instance;
//用于保证创建单例的init方法只执行一次的锁
static pthread_once_t _once;
//用户保护内存池链表修改的互斥锁
static pthread_mutex_t _mutex;
};
```
6.LARV0.3_io_event和event_loop定义
首先`buf_pool`采用单例的方式进行设计。因为系统希望仅有一个内存池管理模块。这里内存池用一个`__gnu_cxx::hash_map<int, io_buf*>`的map类型进行管理,其中key是每个组内存的空间容量,参考
```cpp
enum MEM_CAP {
m4K = 4096,
m16K = 16384,
m64K = 65536,
m256K = 262144,
m1M = 1048576,
m4M = 4194304,
m8M = 8388608
};
```
其中每个key下面挂在一个`io_buf`链表。而且`buf_pool`预先会给map下的每个key的内存组开辟好一定数量的内存块。然后上层用户在使用的时候每次取出一个内存块,就会将该内存块从该内存组摘掉。当然使用完就放回来。如果不够使用会额外开辟,也有最大的内存限制,在宏`EXTRA_MEM_LIMIT`中。
具体的`buf_pool`实现如下:
> lars_reactor/src/buf_pool.cpp
```cpp
#include "buf_pool.h"
#include <assert.h>
//单例对象
buf_pool * buf_pool::_instance = NULL;
//用于保证创建单例的init方法只执行一次的锁
pthread_once_t buf_pool::_once = PTHREAD_ONCE_INIT;
//用户保护内存池链表修改的互斥锁
pthread_mutex_t buf_pool::_mutex = PTHREAD_MUTEX_INITIALIZER;
//构造函数 主要是预先开辟一定量的空间
//这里buf_pool是一个hash,每个key都是不同空间容量
//对应的value是一个io_buf集合的链表
//buf_pool --> [m4K] -- io_buf-io_buf-io_buf-io_buf...
// [m16K] -- io_buf-io_buf-io_buf-io_buf...
// [m64K] -- io_buf-io_buf-io_buf-io_buf...
// [m256K] -- io_buf-io_buf-io_buf-io_buf...
// [m1M] -- io_buf-io_buf-io_buf-io_buf...
// [m4M] -- io_buf-io_buf-io_buf-io_buf...
// [m8M] -- io_buf-io_buf-io_buf-io_buf...
buf_pool::buf_pool():_total_mem(0)
{
io_buf *prev;
//----> 开辟4K buf 内存池
_pool[m4K] = new io_buf(m4K);
if (_pool[m4K] == NULL) {
fprintf(stderr, "new io_buf m4K error");
exit(1);
}
prev = _pool[m4K];
//4K的io_buf 预先开辟5000个,约20MB供开发者使用
for (int i = 1; i < 5000; i ++) {
prev->next = new io_buf(m4K);
if (prev->next == NULL) {
fprintf(stderr, "new io_buf m4K error");
exit(1);
}
prev = prev->next;
}
_total_mem += 4 * 5000;
//----> 开辟16K buf 内存池
_pool[m16K] = new io_buf(m16K);
if (_pool[m16K] == NULL) {
fprintf(stderr, "new io_buf m16K error");
exit(1);
}
prev = _pool[m16K];
//16K的io_buf 预先开辟1000个,约16MB供开发者使用
for (int i = 1; i < 1000; i ++) {
prev->next = new io_buf(m16K);
if (prev->next == NULL) {
fprintf(stderr, "new io_buf m16K error");
exit(1);
}
prev = prev->next;
}
_total_mem += 16 * 1000;
7.LARV0.3_event_loop添加一个事件
//----> 开辟64K buf 内存池
_pool[m64K] = new io_buf(m64K);
if (_pool[m64K] == NULL) {
fprintf(stderr, "new io_buf m64K error");
exit(1);
}
prev = _pool[m64K];
//64K的io_buf 预先开辟500个,约32MB供开发者使用
for (int i = 1; i < 500; i ++) {
prev->next = new io_buf(m64K);
if (prev->next == NULL) {
fprintf(stderr, "new io_buf m64K error");
exit(1);
}
prev = prev->next;
}
_total_mem += 64 * 500;
//----> 开辟256K buf 内存池
_pool[m256K] = new io_buf(m256K);
if (_pool[m256K] == NULL) {
fprintf(stderr, "new io_buf m256K error");
exit(1);
}
prev = _pool[m256K];
//256K的io_buf 预先开辟200个,约50MB供开发者使用
for (int i = 1; i < 200; i ++) {
prev->next = new io_buf(m256K);
if (prev->next == NULL) {
fprintf(stderr, "new io_buf m256K error");
exit(1);
}
prev = prev->next;
}
_total_mem += 256 * 200;
//----> 开辟1M buf 内存池
_pool[m1M] = new io_buf(m1M);
if (_pool[m1M] == NULL) {
fprintf(stderr, "new io_buf m1M error");
exit(1);
}
prev = _pool[m1M];
//1M的io_buf 预先开辟50个,约50MB供开发者使用
for (int i = 1; i < 50; i ++) {
prev->next = new io_buf(m1M);
if (prev->next == NULL) {
fprintf(stderr, "new io_buf m1M error");
exit(1);
}
prev = prev->next;
}
_total_mem += 1024 * 50;
//----> 开辟4M buf 内存池
_pool[m4M] = new io_buf(m4M);
if (_pool[m4M] == NULL) {
fprintf(stderr, "new io_buf m4M error");
exit(1);
}
prev = _pool[m4M];
//4M的io_buf 预先开辟20个,约80MB供开发者使用
for (int i = 1; i < 20; i ++) {
prev->next = new io_buf(m4M);
if (prev->next == NULL) {
fprintf(stderr, "new io_buf m4M error");
exit(1);
}
prev = prev->next;
}
_total_mem += 4096 * 20;
//----> 开辟8M buf 内存池
_pool[m8M] = new io_buf(m8M);
if (_pool[m8M] == NULL) {
fprintf(stderr, "new io_buf m8M error");
exit(1);
}
prev = _pool[m8M];
//8M的io_buf 预先开辟10个,约80MB供开发者使用
for (int i = 1; i < 10; i ++) {
prev->next = new io_buf(m8M);
if (prev->next == NULL) {
fprintf(stderr, "new io_buf m8M error");
exit(1);
}
prev = prev->next;
}
_total_mem += 8192 * 10;
}
8.LARV0.3_event_loop的epoll_wait封装
//开辟一个io_buf
//1 如果上层需要N个字节的大小的空间,找到与N最接近的buf hash组,取出,
//2 如果该组已经没有节点使用,可以额外申请
//3 总申请长度不能够超过最大的限制大小 EXTRA_MEM_LIMIT
//4 如果有该节点需要的内存块,直接取出,并且将该内存块从pool摘除
io_buf *buf_pool::alloc_buf(int N)
{
//1 找到N最接近哪hash 组
int index;
if (N <= m4K) {
index = m4K;
}
else if (N <= m16K) {
index = m16K;
}
else if (N <= m64K) {
index = m64K;
}
else if (N <= m256K) {
index = m256K;
}
else if (N <= m1M) {
index = m1M;
}
else if (N <= m4M) {
index = m4M;
}
else if (N <= m8M) {
index = m8M;
}
else {
return NULL;
}
//2 如果该组已经没有,需要额外申请,那么需要加锁保护
pthread_mutex_lock(&_mutex);
if (_pool[index] == NULL) {
if (_total_mem + index/1024 >= EXTRA_MEM_LIMIT) {
//当前的开辟的空间已经超过最大限制
fprintf(stderr, "already use too many memory!\n");
exit(1);
}
io_buf *new_buf = new io_buf(index);
if (new_buf == NULL) {
fprintf(stderr, "new io_buf error\n");
exit(1);
}
_total_mem += index/1024;
pthread_mutex_unlock(&_mutex);
return new_buf;
}
//3 从pool中摘除该内存块
io_buf *target = _pool[index];
_pool[index] = target->next;
pthread_mutex_unlock(&_mutex);
target->next = NULL;
return target;
}
//重置一个io_buf,将一个buf 上层不再使用,或者使用完成之后,需要将该buf放回pool中
void buf_pool::revert(io_buf *buffer)
{
//每个buf的容量都是固定的 在hash的key中取值
int index = buffer->capacity;
//重置io_buf中的内置位置指针
buffer->length = 0;
buffer->head = 0;
pthread_mutex_lock(&_mutex);
//找到对应的hash组 buf首届点地址
assert(_pool.find(index) != _pool.end());
//将buffer插回链表头部
buffer->next = _pool[index];
_pool[index] = buffer;
pthread_mutex_unlock(&_mutex);
}
```
9.LARV0.3-event_loop删除事件
其中,`buf_pool`构造函数中实现了内存池的hash预开辟内存工作,具体的数据结构如下
```cpp
//buf_pool --> [m4K] --> io_buf-io_buf-io_buf-io_buf...
// [m16K] --> io_buf-io_buf-io_buf-io_buf...
// [m64K] --> io_buf-io_buf-io_buf-io_buf...
// [m256K] --> io_buf-io_buf-io_buf-io_buf...
// [m1M] --> io_buf-io_buf-io_buf-io_buf...
// [m4M] --> io_buf-io_buf-io_buf-io_buf...
// [m8M] --> io_buf-io_buf-io_buf-io_buf...
```
`alloc_buf()`方法,是调用者从内存池中取出一块内存,如果最匹配的内存块存在,则返回,并将该块内存从buf_pool中摘除掉,如果没有则开辟一个内存出来。 `revert()`方法则是将已经使用完的`io_buf`重新放回`buf_pool`中。
### 3.3 读写buffer机制
那么接下来我们就需要实现一个专门用来读(输入)数据的`input_buf`和专门用来写(输出)数据的`output_buf`类了。由于这两个人都应该拥有一些`io_buf`的特性,所以我们先定义一个基础的父类`reactor_buf`。
#### A. reactor_buf类
> lars_reactor/include/reactor_buf.h
```h
#pragma once
#include "io_buf.h"
#include "buf_pool.h"
#include <assert.h>
#include <unistd.h>
/*
* 给业务层提供的最后tcp_buffer结构
* */
class reactor_buf {
public:
reactor_buf();
~reactor_buf();
const int length() const;
void pop(int len);
void clear();
protected:
io_buf *_buf;
};
```
这个的作用就是将io_buf作为自己的一个成员,然后做了一些包装。具体方法实现如下。
> lars_reactor/src/reactor.cpp
```cpp
#include "reactor_buf.h"
#include <sys/ioctl.h>
#include <string.h>
reactor_buf::reactor_buf()
{
_buf = NULL;
}
reactor_buf::~reactor_buf()
{
clear();
}
const int reactor_buf::length() const
{
return _buf != NULL? _buf->length : 0;
}
void reactor_buf::pop(int len)
{
assert(_buf != NULL && len <= _buf->length);
_buf->pop(len);
//当此时_buf的可用长度已经为0
if(_buf->length == 0) {
//将_buf重新放回buf_pool中
buf_pool::instance()->revert(_buf);
_buf = NULL;
}
}
void reactor_buf::clear()
{
if (_buf != NULL) {
//将_buf重新放回buf_pool中
buf_pool::instance()->revert(_buf);
_buf = NULL;
}
}
```
10.LARV0.3_event_loop添加listenfd读事件
#### B. input_buf类
接下来就可以集成`reactor_buf`类实现`input_buf`类的设计了。
> lars_reactor/include/reactor_buf.h
```h
//读(输入) 缓存buffer
class input_buf : public reactor_buf
{
public:
//从一个fd中读取数据到reactor_buf中
int read_data(int fd);
//取出读到的数据
const char *data() const;
//重置缓冲区
void adjust();
};
```
其中data()方法即取出已经读取的数据,adjust()含义和`io_buf`含义一致。主要是`read_data()`方法。具体实现如下。
> lars_reactor/src/reactor.cpp
```cpp
//从一个fd中读取数据到reactor_buf中
int input_buf::read_data(int fd)
{
int need_read;//硬件有多少数据可以读
//一次性读出所有的数据
//需要给fd设置FIONREAD,
//得到read缓冲中有多少数据是可以读取的
if (ioctl(fd, FIONREAD, &need_read) == -1) {
fprintf(stderr, "ioctl FIONREAD\n");
return -1;
}
if (_buf == NULL) {
//如果io_buf为空,从内存池申请
_buf = buf_pool::instance()->alloc_buf(need_read);
if (_buf == NULL) {
fprintf(stderr, "no idle buf for alloc\n");
return -1;
}
}
else {
//如果io_buf可用,判断是否够存
assert(_buf->head == 0);
if (_buf->capacity - _buf->length < (int)need_read) {
//不够存,冲内存池申请
io_buf *new_buf = buf_pool::instance()->alloc_buf(need_read+_buf->length);
if (new_buf == NULL) {
fprintf(stderr, "no ilde buf for alloc\n");
return -1;
}
//将之前的_buf的数据考到新申请的buf中
new_buf->copy(_buf);
//将之前的_buf放回内存池中
buf_pool::instance()->revert(_buf);
//新申请的buf成为当前io_buf
_buf = new_buf;
}
}
//读取数据
int already_read = 0;
do {
//读取的数据拼接到之前的数据之后
if(need_read == 0) {
//可能是read阻塞读数据的模式,对方未写数据
already_read = read(fd, _buf->data + _buf->length, m4K);
} else {
already_read = read(fd, _buf->data + _buf->length, need_read);
}
} while (already_read == -1 && errno == EINTR); //systemCall引起的中断 继续读取
if (already_read > 0) {
if (need_read != 0) {
assert(already_read == need_read);
}
_buf->length += already_read;
}
return already_read;
}
11.LARV0.3-event_loop添加写事件-基于v0.3开发
//取出读到的数据
const char *input_buf::data() const
{
return _buf != NULL ? _buf->data + _buf->head : NULL;
}
//重置缓冲区
void input_buf::adjust()
{
if (_buf != NULL) {
_buf->adjust();
}
}
```
#### C. output_buf类
接下来就可以集成`reactor_buf`类实现`output_buf`类的设计了。
> lars_reactor/include/reactor_buf.h
```h
//写(输出) 缓存buffer
class output_buf : public reactor_buf
{
public:
//将一段数据 写到一个reactor_buf中
int send_data(const char *data, int datalen);
//将reactor_buf中的数据写到一个fd中
int write2fd(int fd);
};
```
`send_data()`方法主要是将数据写到`io_buf`中,实际上并没有做真正的写操作。而是当调用`write2fd`方法时,才会将`io_buf`的数据写到对应的fd中。send_data是做一些buf内存块的申请等工作。具体实现如下
> lars_reactor/src/reactor.cpp
```cpp
//将一段数据 写到一个reactor_buf中
int output_buf::send_data(const char *data, int datalen)
{
if (_buf == NULL) {
//如果io_buf为空,从内存池申请
_buf = buf_pool::instance()->alloc_buf(datalen);
if (_buf == NULL) {
fprintf(stderr, "no idle buf for alloc\n");
return -1;
}
}
else {
//如果io_buf可用,判断是否够存
assert(_buf->head == 0);
if (_buf->capacity - _buf->length < datalen) {
//不够存,冲内存池申请
io_buf *new_buf = buf_pool::instance()->alloc_buf(datalen+_buf->length);
if (new_buf == NULL) {
fprintf(stderr, "no ilde buf for alloc\n");
return -1;
}
//将之前的_buf的数据考到新申请的buf中
new_buf->copy(_buf);
//将之前的_buf放回内存池中
buf_pool::instance()->revert(_buf);
//新申请的buf成为当前io_buf
_buf = new_buf;
}
}
//将data数据拷贝到io_buf中,拼接到后面
memcpy(_buf->data + _buf->length, data, datalen);
_buf->length += datalen;
return 0;
}
//将reactor_buf中的数据写到一个fd中
int output_buf::write2fd(int fd)
{
assert(_buf != NULL && _buf->head == 0);
int already_write = 0;
do {
already_write = write(fd, _buf->data, _buf->length);
} while (already_write == -1 && errno == EINTR); //systemCall引起的中断,继续写
if (already_write > 0) {
//已经处理的数据清空
_buf->pop(already_write);
//未处理数据前置,覆盖老数据
_buf->adjust();
}
//如果fd非阻塞,可能会得到EAGAIN错误
if (already_write == -1 && errno == EAGAIN) {
already_write = 0;//不是错误,仅仅返回0,表示目前是不可以继续写的
}
return already_write;
}
```
12.LARV0.3总结
现在我们已经完成了内存管理及读写buf机制的实现,接下来就要简单的测试一下,用我们之前的V0.1版本的reactor server来测试。
### 3.4 完成Lars Reactor V0.2开发
#### A. 修改tcp_server
主要修正do_accept()方法,加上reactor_buf机制.
> lars_reactor/src/tcp_server.cpp
```cpp
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
#include "reactor_buf.h"
//server的构造函数
tcp_server::tcp_server(const char *ip, uint16_t port)
{
//...
}
//开始提供创建链接服务
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept与客户端创建链接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立链接过多,资源不够
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
int ret = 0;
input_buf ibuf;
output_buf obuf;
char *msg = NULL;
int msg_len = 0;
do {
ret = ibuf.read_data(connfd);
if (ret == -1) {
fprintf(stderr, "ibuf read_data error\n");
break;
}
printf("ibuf.length() = %d\n", ibuf.length());
//将读到的数据放在msg中
msg_len = ibuf.length();
msg = (char*)malloc(msg_len);
bzero(msg, msg_len);
memcpy(msg, ibuf.data(), msg_len);
ibuf.pop(msg_len);
ibuf.adjust();
printf("recv data = %s\n", msg);
//回显数据
obuf.send_data(msg, msg_len);
while(obuf.length()) {
int write_ret = obuf.write2fd(connfd);
if (write_ret == -1) {
fprintf(stderr, "write connfd error\n");
return;
}
else if(write_ret == 0) {
//不是错误,表示此时不可写
break;
}
}
free(msg);
} while (ret != 0);
//Peer is closed
close(connfd);
}
}
}
```
相关文章:
C++负载均衡远程调用学习之Reactor事件触发机制
目录 1.LARV0.2-REACTOR_BUF实现 2.LARV0.2-outpu_buf实现 3.LARV0.2-reactor继承内存管理 4.LARV0.2流程总结 5.LARV0.3-多路IO事件的分析 6.LARV0.3_io_event和event_loop定义 7.LARV0.3_event_loop添加一个事件 8.LARV0.3_event_loop的epoll_wait封装 9.LARV0.3-eve…...
将uni-app前端项目发布到微信小程序体验版
1、修改后端接口调用地址 const REQUEST_CONST {BASE_URL:https://11.22.33.44:9090, } export default REQUEST_CONST 2、登录微信小程序平台,获取AppID 3、配置微信小程序AppID 在项目根目录下找到manifest.json文件,配置微信小程序相关的参数 4、…...
深入理解CSS显示模式与盒子模型
一、CSS显示模式:元素的“性格”决定布局 1. 显示模式基础 CSS显示模式(display属性)决定了元素在页面中的排列方式和尺寸表现。常见的显示模式有三大类型: 2. 块级元素(Block) 特点:独占一…...
突破SQL注入字符转义的实战指南:绕过技巧与防御策略
在渗透测试中,SQL注入始终是Web安全的重点攻击手段。然而,当开发者对用户输入的特殊字符(如单引号、反斜杠)进行转义时,传统的注入方式往往会失效。本文将深入探讨如何绕过字符转义限制,并给出防御建议。 目…...
java网络原理5
一、网络地址转换(NAT) 1. 原理 - NAT 用于解决 IP 地址不够用的问题 ,将 IP 地址分为外网 IP(公网 IP)和内网 IP(私网 IP)。内网 IP 如 10.、172.16 - 172.31.、192.168.* 等,家用…...
一种基于光源评估并加权平均的自动白平衡方法(一)
在之前的博文如何在白平衡标定种构建不同类型的白平衡色温坐标系作为实例说明的白平衡色温坐标系的构建中,利用的如下映射矩阵构建色温坐标系: 按照上述论文的说明,是不能直接把Raw域中的每块的RGB带入公式...
基于Docker Compose的Prometheus监控系统一键部署方案
前言 在当今的云原生时代,系统监控已经成为保障业务稳定运行的重要基石。本文旨在提供一个完整的解决方案,帮助您快速搭建一个功能强大的监控系统。通过Docker Compose实现一键部署,结合Prometheus、Grafana、cAdvisor和node-exporter等优秀开源工具,构建一个完整的监控体…...
服务器丢包率测试保姆级教程:从Ping到网络打流仪实战
测试服务器丢包率是网络性能诊断的重要环节,丢包通常由网络拥塞、硬件故障、配置错误或线路质量差导致。以下是多种测试方法的详细步骤和工具说明: 一、基础工具测试(无需专业设备) 1. 使用 ping 命令 命令示例: bash…...
家庭服务器IPV6搭建无限邮箱系统指南
qq邮箱操作 // 邮箱配置信息 // 注意:使用QQ邮箱需要先开启IMAP服务并获取授权码 // 设置方法:登录QQ邮箱 -> 设置 -> 账户 -> 开启IMAP/SMTP服务 -> 生成授权码 服务器操作 fetchmail 同步QQ邮箱 nginx搭建web显示本地同步过来的邮箱 ssh…...
Ubuntu ZLMediakit的标准配置文件(rtsp->rtmp->hls)
最近在工作中遇到不生成hls资源的问题,后面发现是配置文件有误,特此记录正确的config.ini配置文件,方便查阅。 最终解决方案,通过下面这种格式可以访问到flv视频,具体为什么不太清楚,rtmp格式:rtmp://39.113.48.113:8089/live/1744168516937396175 记录最终解决方案:ht…...
Android 移动开发:ProgressBar(转圈进度条)
目录 Android 移动开发:ProgressBar(转圈进度条)控件实战介绍 📂 文件说明 🧾 activity_main.xml(布局文件,XML) 🧾 MainActivity.java(逻辑代码…...
CSS:选择器-复合选择器
文章目录 1、交集选择器 1、交集选择器 <style>/* 选中类名为rich的元素*/.rich {color: gold;}/* 选中类名为beauty的元素*/.beauty {color: red;}/* 选中类名为beauty的p元素,这种形式(元素配合类选择器)以后用的很多!&am…...
Kafka-可视化工具-Offset Explorer
安装: 下载地址:Offset Explorer 安装好后如图: 1、下载安装完毕,进行新增连接,启动offsetexplorer.exe,在Add Cluster窗口Properties 选项下填写Cluster name 和 kafka Cluster Version Cluster name (集…...
在pycharm中创建Django项目并启动
Django介绍 Django 是一个基于 Python 的开源 Web 应用框架,采用了 MTV(Model - Template - View)软件设计模式 ,由许多功能强大的组件组成,能够帮助开发者快速、高效地创建复杂的数据库驱动的 Web 应用程序。它具有以…...
私有知识库 Coco AI 实战(六):打造 ES Mapping 小助手
开发同学可能经常和字段类型打交道,数据类型本来就不少,新版本可能还有新的数据类型。更重要的是新的字段类型可能会提升某个场景的性能,不知道的话可就亏大发了。所以我们继续打造一个 ES Mapping 小助手。 克隆小助手 我们进入 Coco Serv…...
JavaScript性能优化实战之代码层面性能优化
在前端开发中,JavaScript 的性能直接影响到网站的加载速度、用户体验和交互流畅度。针对代码层面的优化,我们可以从多个方面入手,确保每一行代码都能最大化地发挥效能。接下来,我们将细化并解释每一个优化点。 1️⃣ 避免全局变量污染 全局变量会被整个 JavaScript 代码所…...
基于C++的IOT网关和平台2:github项目ctGateway技术说明书
初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C++的,可以在任何平台上使用。 源码指引:github源码指引_初级代码游戏的博客-CSDN博客 …...
前端基础之《Vue(13)—重要API》
重要的API 一、nextTick() 1、写法 Vue.$nextTick()或者this.$nextTick() 原因: set操作代码是同步的,但是代码背后的行为是异步的。set操作修改声明式变量,触发re-render生成新的虚拟DOM,进一步执行diff运算,找到…...
Python爬虫实战:获取彼岸网高清素材图片
一、引言 在数字化时代,图片素材的需求持续增长。彼岸网提供了丰富的高质量图片资源,其中 4K 风景图片备受用户青睐。借助 Python 爬虫技术,可自动化地从彼岸网获取这些图片,为用户提供便捷的图片素材服务。然而,爬取过程中会遭遇登录验证、反爬机制等问题,需采用相应技…...
拥抱 Kotlin Flow
1. 引言 Kotlin Flow 是 Kotlin 协程生态中处理异步数据流的核心工具,它提供了一种声明式、轻量级且与协程深度集成的响应式编程模型。与传统的 RxJava 相比,Flow 更简洁、更易于维护,尤其在 Android 开发中已成为主流选择。本文将从基础概念…...
winget使用
Get-Command winget winget search qq winget install Tencent.QQ.NT...
C++从入门到实战(十一)详细讲解C/C++语言中内存分布与C与C++内存管理对比
C从入门到实战(十一)详细讲解C/C语言中内存分布与C与C内存管理对比 前言一、C/C语言中内存分布1.内核空间2.栈3.堆4.数据段5.代码段 二、例题带练巩固C/C语言中内存分布的知识题目讲解题目答案 三、C语言动态内存分配(知识回顾)3.…...
flutter 专题 一百零四 Flutter环境搭建
Flutter简介 Flutter 是Google开发的一个移动跨平台(Android 和 iOS)的开发框架,使用的是 Dart 语言。和 React Native 不同的是,Flutter 框架并不是一个严格意义上的原生应用开发框架。Flutter 的目标是用来创建高性能、高稳定性…...
傅里叶与相位偏移
一、简介 大三的《离散数学》。。。。。 傅里叶变换是数学与工程领域的一项革命性工具,其核心思想是将复杂信号分解为简单正弦波的叠加,实现从时域(时间维度)到频域(频率维度)的转换。通过这种变换&#x…...
Godot笔记:入门索引
文章目录 前言游戏引擎软件界面关键概念GDScript导出成品创建非游戏应用后记 前言 最近对游戏引擎这块感兴趣,特别是因为游戏引擎自带的很多工具,作为图形化软件的开发应该也不错。 Godot 是一款这几年比较流行的开源游戏引擎。这里记录下入门学习使用 …...
OpenCV实战教程 第一部分:基础入门
第一部分:基础入门 1. OpenCV简介 什么是OpenCV及其应用领域 OpenCV(Open Source Computer Vision Library)是一个开源的计算机视觉和机器学习软件库,于1999年由Intel公司发起,现在由非营利组织OpenCV.org维护。Ope…...
OpenCV 图像处理核心技术 (第二部分)
欢迎来到 OpenCV 图像处理的第二部分!在第一部分,我们学习了如何加载、显示、保存图像以及访问像素等基础知识。现在,我们将深入探索如何利用 OpenCV 提供的强大工具来修改和分析图像。 图像处理是计算机视觉领域的基石。通过对图像进行各种…...
Git从入门到精通-第二章-工具配置
目录 命令行 安装Git 初次运行Git前的配置 git config基本概念 常用命令 配置用户信息 配置文本编辑器 查看配置 配置别名(简化命令) 高级配置 换行符处理(方便跨平台协作) 忽略文件权限变更(常用于团队协…...
树状结构转换工具类
项目中使用了很多树状结构,为了方便使用开发一个通用的工具类。 使用工具类的时候写一个类基础BaseNode,如果有个性化字段添加到类里面,然后就可以套用工具类。 工具类会将id和pid做关联返回一个树状结构的集合。 使用了hutool的工具包判空…...
C#基础简述
C#基础详解 一、C#语言概述 C#(读作"C Sharp")是微软开发的面向对象的编程语言,运行在.NET平台上。它结合了C的强大功能和Visual Basic的简单性,具有以下特点: 面向对象:支持封装、继…...
AI赋能烟草工艺革命:虫情监测步入智能化时代
在当今竞争激烈且品质至上的烟草行业中,生产流程的每一个细微环节都关乎着企业的生死存亡与品牌的兴衰荣辱。烟草工艺部门与制丝、卷包车间作为生产链条的核心驱动,犹如精密仪器中的关键齿轮,彼此紧密咬合、协同运转,任何一处的小…...
小刚说C语言刷题—1462小明的游泳时间
1.题目描述 伦敦奥运会要到了,小明在拼命练习游泳准备参加游泳比赛。 这一天,小明给自己的游泳时间做了精确的计时(本题中的计时都按 24 小时制计算),它发现自己从 a 时 b 分一直游泳到当天的 c 时 d 分。 请你帮小…...
StarRocks Lakehouse 如何重构大数据架构?
随着数据分析需求的不断演进,企业对数据处理架构的期望也在不断提升。在这一背景下,StarRocks 凭借其高性能的实时分析能力,正引领数据分析进入湖仓一体的新时代。 4 月 18 日,镜舟科技高级技术专家单菁茹做客开源中国直播栏目《…...
用TCP实现服务器与客户端的交互
引言: 这篇文章主要是用TCP构造的回显服务器,也就是客户端发什么,就返回什么。用实现这个过程方式来学会TCP套接字的使用。 一、TCP的特点 TCP是可靠的:这个需要去了解TCP的机制,这是一个大工程,博主后面写…...
用于实时辐射场渲染的3D高斯溅射——3D Gaussian Splatting for Real-Time Radiance Field Rendering
用于实时辐射场渲染的3D高斯溅射——3D Gaussian Splatting for Real-Time Radiance Field Rendering 文章目录 用于实时辐射场渲染的3D高斯溅射——3D Gaussian Splatting for Real-Time Radiance Field Rendering摘要Abstract1. 预备知识1.1 三维的几何表示1.2 计算机中的集合…...
Vue3 Echarts 3D立方体柱状图实现教程
文章目录 前言一、实现原理二、series ——type: "pictorialBar" 简介2.1 常用属性 三、代码实战3.1 封装一个echarts通用组件 echarts.vue3.2 实现一个立方体柱状图(1)首先实现一个基础柱状图(2)添加立方体棱线&#x…...
Soildworks怎样在装配体中建立局部剖视图
1思路:建立拉伸切除 2步骤 1-打开点线面显示按钮 2-在装配体中依据某个基准面(例如前视基准面)建立一个待切除的草图 3-点击顶部工具栏的装配体--->装嫩配体特征---->拉伸切除---Ok 3具体图示 1-点击,使其变成灰色 即…...
基于C++的IOT网关和平台5:github项目ctGateway开发指南
初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C++的,可以在任何平台上使用。 源码指引:github源码指引_初级代码游戏的博客-CSDN博客 …...
虚拟机centos7安装docker
虚拟机CentOS 7上安装 Docker流程 1. 更新系统软件包 需要确保系统软件包是最新的 sudo yum -y update sudo:以超级用户权限执行命令。 yum:CentOS的包管理器工具。 -y:自动确认所有提示,直接执行。 2. 安装 Docker 依赖 在安装 …...
11.Spring Boot 3.1.5 中使用 SpringDoc OpenAPI(替代 Swagger)生成 API 文档
Spring Boot 3.1.5 中使用 SpringDoc OpenAPI(替代 Swagger)生成 API 文档 1. 项目结构 假设项目名为 springboot-openapi-demo,以下是项目的基本结构: springboot-openapi-demo/ ├── src/ │ ├── main/ │ │ ├─…...
pytorch对应gpu版本是否可用判断逻辑
# gpu_is_ok.py import torchdef check_torch_gpu():# 打印PyTorch版本print(f"PyTorch version: {torch.__version__}")# 检查CUDA是否可用cuda_available torch.cuda.is_available()print(f"CUDA available: {cuda_available}")if cuda_available:# 打印…...
Kubernetes 集群概念详解
Kubernetes 集群概念详解 Kubernetes 集群是由多个计算节点组成的容器编排系统,用于自动化部署、扩展和管理容器化应用。以下是 Kubernetes 集群的核心概念和架构解析: 一、集群基础架构 1. 集群组成要素 graph TBMaster[控制平面] --> Node1[工作…...
BT137-ASEMI机器人功率器件专用BT137
编辑:LL BT137-ASEMI机器人功率器件专用BT137 型号:BT137 品牌:ASEMI 封装:TO-220F 批号:最新 引脚数量:3 封装尺寸:如图 特性:双向可控硅 工作结温:-40℃~150℃…...
ArcGIS+GPT:多领域地理分析与决策新方案
技术点目录 AI大模型应用ArcGIS工作流程及功能prompt的使用技巧AI助力工作流程AI助力数据读取AI助力数据编辑与处理AI助力空间分析AI助力遥感分析AI助力二次开发AI助力科研绘图ArcGISAI综合应用了解更多 ——————————————————————————————————…...
鸿蒙文件上传-从前端到后端详解,对比jq请求和鸿蒙arkts请求区别,对比new FormData()和鸿蒙arktsrequest.uploadFile
需要权限:ohos.permission.INTERNET 1.nodejs自定义书写上传后端接口 传输过来的数据放在files?.image下 router.post(/upload,(req, res) > {var form new multiparty.Form();form.uploadDirpublic/images/uploads; //上传图片保存的地址(目录必须存在)fo…...
【DBeaver】如何连接MongoDB
MongoDB驱动 在 DBeaver 社区版是没有的,得自己下载 一、下载mongo-jdbc-standalone.jar 二、在工具栏找到数据库,选择驱动管理器 三、在驱动管理器点击新建 四、选择库,添加mongo-jdbc-standalone.jar;然后点击找到类 五、选择设置&#x…...
Unity 粒子同步,FishNet
Github的工程 同步画面 使用FishNet插件同步,可使用这个选项来克隆第二个项目进行测试...
自然语言处理之命名实体识别:Bi-LSTM-CRF模型的评估与性能分析
命名实体识别(Named Entity Recognition, NER)是自然语言处理(NLP)的核心任务之一,旨在从文本中识别出具有特定意义的实体(如人名、地名、机构名等),并为其分类。随着深度学习的发展,**Bi-LSTM-CRF**(双向长短期记忆网络结合条件随机场)模型因其强大的序列建模能力成…...
【SpringBoot】基于mybatisPlus的博客系统
1.实现用户登录 在之前的项目登录中,我使用的是Session传递用户信息实现校验登录 现在学习了Jwt令牌技术后我尝试用Jwt来完成校验工作 Jwt令牌 令牌一词在网络编程一节我就有所耳闻,现在又拾了起来。 这里讲应用:令牌也就用于身份标识&a…...
[Android]任务列表中有两个相机图标
现象: 修改AndroidManifest.xml <activityandroid:name"com.android.camera.PermissionsActivity"android:label"string/app_name"android:launchMode"singleTop"android:configChanges"orientation|screenSize|keyboardH…...