仿 RabbitMQ 实现的简易消息队列
文章目录
- 项目介绍
- 开放环境
- 第三⽅库介绍
- Protobuf
- Muduo库
- 需求分析
- 核⼼概念
- 实现内容
- 消息队列系统整体框架
- 服务端模块
- 数据管理模块
- 虚拟机数据管理模块
- 交换路由模块
- 消费者管理模块
- 信道(通信通道)管理模块
- 连接管理模块
- 客户端模块
- 公共模块
- 日志类
- 其他工具类
- SQLite基础操作
- 字符串操作
- 唯一UUID生成
- 文件基础操作
- 消息类型和交换机类型的定义
- 服务端
- 持久化数据管理中⼼模块
- 交换机数据管理
- 队列数据管理模块
- 绑定数据管理模块
- 消息数据管理模块
- 虚拟机管理模块
- 交换路由模块
- 信道管理模块
- 连接管理模块
- 消费者管理模块
- ⽹络通信协议设计
- Server服务器模块
- 客户端
- 订阅者模块
- 信道管理模块
- 连接管理模块
- 异步线程池模块
- 源码
项目介绍
曾经我们学习过阻塞队列(BlockingQueue)。当时我们说阻塞队列最⼤的⽤途,就是⽤来实现⽣产者
消费者模型。
优势:
- 解耦合
- ⽀持并发
- ⽀持忙闲不均
- 削峰填⾕
在实际的后端开发中,尤其是分布式系统⾥,跨主机之间使⽤⽣产者消费者模型,也是⾮常普遍的需求。
因此,我们通常会把阻塞队列封装成⼀个独⽴的服务器程序,并且赋予其更丰富的功能。这样的服务程
序我们就称为消息队列(MessageQueue,MQ)。
RabbitMQ是⼀个⾮常知名、功能强⼤且⼴泛使⽤的消息队列;
所以我们仿照其模拟实现一个简易的消息队列
开放环境
- Linux(Ubuntu-22.04)
- VSCode
- g++/gdb
- Makefile
- 使用语言C++;
- 序列化框架:Protobuf⼆进制序列化
- ⽹络通信:muduo库 + 自定义应用层协议
- 测试框架:Gtest
- 源数据信息数据库:SQLite3
第三⽅库介绍
该项目所要用到的第三方库
Protobuf
序列化&反序列化框架
特点:
- 语⾔⽆关、平台⽆关:即ProtoBuf⽀持Java、C++、Python等多种语⾔,⽀持多个平台
- ⾼效:即⽐XML更⼩、更快、更为简单
- 扩展性、兼容性好:你可以更新数据结构,⽽不影响和破坏原有的旧程序
使用步骤
- 编写
proto
文件
描述想定义的结构化对象
对象有什么成员,每个成员有怎么样的属性
- 编译
proto
文件
protoc --cpp_out=. xxx.proto
编译后会生成一系列接口代码
.cc 源文件:定义实现了结构化对象数据的访问、操作、序列化、反序列化接口.h头文件:定义了描述的数据结构对象类
根据我们的对象描述生成在.h中的代码
- 把编译生成的头文件包含进我们的代码中
Muduo库
Muduo由陈硕⼤佬开发,是⼀个基于⾮阻塞IO和事件驱动的C++⾼并发TCP⽹络编程库。它是⼀款基于主从Reactor模型的⽹络库,其使⽤的线程模型是oneloopperthread,所谓oneloopperthread指的是
- ⼀个线程只能有⼀个事件循环(EventLoop),⽤于响应计时器和IO事件
- ⼀个⽂件描述符只能由⼀个线程进⾏读写,换句话说就是⼀个TCP连接必须归属于某个EventLoop
管理
需求分析
核⼼概念
实现内容
- broke服务器:消息队列服务器;
- 消息发布客户端:向服务器发布消息;
- 消息订阅客户端:从服务器订阅消息;
消息队列系统整体框架
服务端模块
数据管理模块
虚拟机数据管理模块
交换路由模块
消费者管理模块
信道(通信通道)管理模块
连接管理模块
客户端模块
上述整体思维导图
整个项目分为三大模块:
mqcommon
公共模块,mqserver
消息队列服务端模块、
mqclient
客户端模块。
公共模块
我们将常用的一些工具封装起来,放在mqcommon
目录下
日志类
编写一个日志工具模块mq_logger.hpp
,进行日志打印
#ifndef __M_LOG_H__
#define __M_LOG_H__
#include <iostream>
#include <ctime>#define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...) {\if (level >= DEFAULT_LEVEL) {\time_t t = time(nullptr);\struct tm* ptm = localtime(&t);\char time_str[32];\strftime(time_str, 31, "%H:%M:%S", ptm);\printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\}\
}#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
#endif
其他工具类
mq_helper.hpp
下主要实现了
SQLite
基础操作- 字符串操作
- 唯一
UUID
生成 - 文件基础操作
SQLite基础操作
- 创建/打开数据库文件
- 执行 SQL 语句
- 关闭数据库
class SqliteHelper
{
public:typedef int (*SqliteCallback)(void *, int, char **, char **);SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr) {}bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX){// int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);if (ret != SQLITE_OK){ELOG("创建/打开sqlite数据库失败: %s", sqlite3_errmsg(_handler));return false;}return true;}bool exec(const std::string &sql, SqliteCallback cb, void *arg){// int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);if (ret != SQLITE_OK){ELOG("%s \n语句执行失败: %s", sql.c_str(), sqlite3_errmsg(_handler));return false;}return true;}void close(){if (_handler)sqlite3_close_v2(_handler);}private:std::string _dbfile;sqlite3 *_handler;
};
字符串操作
路由匹配中。交换机需要根据消息的binding_key
和队列的routing_key
进行匹配,但这些都是由一些特殊符号分开的,所以我需要取出来
class StrHelper
{
public:static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result){size_t pos, idx = 0;while (idx < str.size()){pos = str.find(sep, idx);if (pos == std::string::npos){result.push_back(str.substr(idx));return result.size();}if (pos == idx){idx = pos + sep.size();continue;}result.push_back(str.substr(idx, pos - idx));idx = pos + sep.size();}return result.size();}
};
唯一UUID生成
对于每个消息,都需要生成唯一的ID
进行标识
我们使用8个随机数字 + 8字节序号,共16字节数组 生成32位16进制字符
class UUIDHelper
{
public:static std::string uuid(){std::random_device rd;std::mt19937_64 gernator(rd());std::uniform_int_distribution<int> distribution(0, 255);std::stringstream ss;for (int i = 0; i < 8; i++){ss << std::setw(2) << std::setfill('0') << std::hex << distribution(gernator);if (i == 3 || i == 5 || i == 7){ss << "-";}}static std::atomic<size_t> seq(1);size_t num = seq.fetch_add(1);for (int i = 7; i >= 0; i--){ss << std::setw(2) << std::setfill('0') << std::hex << ((num >> (i * 8)) & 0xff);if (i == 6)ss << "-";}return ss.str();}
};
文件基础操作
包括文件的创建、删除、读、写;
class FileHelper
{
public:FileHelper(const std::string &filename) : _filename(filename) {}bool exists(){struct stat st;return (stat(_filename.c_str(), &st) == 0);}size_t size(){struct stat st;int ret = stat(_filename.c_str(), &st);if (ret < 0){return 0;}return st.st_size;}bool read(char *body, size_t offset, size_t len){// 1. 打开文件std::ifstream ifs(_filename, std::ios::binary | std::ios::in);if (ifs.is_open() == false){ELOG("%s 文件打开失败!", _filename.c_str());return false;}// 2. 跳转文件读写位置ifs.seekg(offset, std::ios::beg);// 3. 读取文件数据ifs.read(body, len);if (ifs.good() == false){ELOG("%s 文件读取数据失败!!", _filename.c_str());ifs.close();return false;}// 4. 关闭文件ifs.close();return true;}bool read(std::string &body){// 获取文件大小,根据文件大小调整body的空间size_t fsize = this->size();body.resize(fsize);return read(&body[0], 0, fsize);}bool write(const char *body, size_t offset, size_t len){// 1. 打开文件std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out);if (fs.is_open() == false){ELOG("%s 文件打开失败!", _filename.c_str());return false;}// 2. 跳转到文件指定位置fs.seekp(offset, std::ios::beg);// 3. 写入数据fs.write(body, len);if (fs.good() == false){ELOG("%s 文件写入数据失败!!", _filename.c_str());fs.close();return false;}// 4. 关闭文件fs.close();return true;}bool write(const std::string &body){return write(body.c_str(), 0, body.size());}bool rename(const std::string &nname){return (::rename(_filename.c_str(), nname.c_str()) == 0);}static std::string parentDirectory(const std::string &filename){// /aaa/bb/ccc/ddd/test.txtsize_t pos = filename.find_last_of("/");if (pos == std::string::npos){// test.txtreturn "./";}std::string path = filename.substr(0, pos);return path;}static bool createFile(const std::string &filename){std::fstream ofs(filename, std::ios::binary | std::ios::out);if (ofs.is_open() == false){ELOG("%s 文件打开失败!", filename.c_str());return false;}ofs.close();return true;}static bool removeFile(const std::string &filename){return (::remove(filename.c_str()) == 0);}static bool createDirectory(const std::string &path){// aaa/bbb/ccc cccc// 在多级路径创建中,我们需要从第一个父级目录开始创建size_t pos, idx = 0;while (idx < path.size()){pos = path.find("/", idx);if (pos == std::string::npos){return (mkdir(path.c_str(), 0775) == 0);}std::string subpath = path.substr(0, pos);int ret = mkdir(subpath.c_str(), 0775);if (ret != 0 && errno != EEXIST){ELOG("创建目录 %s 失败: %s", subpath.c_str(), strerror(errno));return false;}idx = pos + 1;}return true;}static bool removeDirectory(const std::string &path){// rm -rf path// system()std::string cmd = "rm -rf " + path;return (system(cmd.c_str()) != -1);}private:std::string _filename;
};
消息类型和交换机类型的定义
消息是需要进行持久化存储的,因此涉及到数据的序列化和反序列化,此处我们使用protobuf
来生成;
消息本身要素
- 消息属性:消息
ID
,消息投递方式,消息的routing_key
- 消息有效载荷内容
消息额外内容
- 消息的存储位置
- 消息的长度
- 消息是否有效
交换机类型
- direct
- fanout
- topic
消息投递模式
- undurable
- durable
服务端
持久化数据管理中⼼模块
在数据管理模块中管理交换机,队列,队列绑定,消息等部分数据数据
交换机数据管理
- 定义交换机类
交换机类需要管理的数据
// 1. 交换机名称std::string name;// 2. 交换机类型ExchangeType type;// 3. 交换机持久化标志bool durable;// 4. 是否自动删除标志bool auto_delete;// 5. 其他参数std::unordered_map<std::string, std::string> args;
// 1. 定义交换机类struct Exchange{using ptr = std::shared_ptr<Exchange>;// 1. 交换机名称std::string name;// 2. 交换机类型ExchangeType type;// 3. 交换机持久化标志bool durable;// 4. 是否自动删除标志bool auto_delete;// 5. 其他参数std::unordered_map<std::string, std::string> args;Exchange() {}// 构造函数,初始化交换机的各个属性Exchange(const std::string &ename, ExchangeType etype,bool edurable, bool eauto_delete, std::unordered_map<std::string, std::string> &eargs): name(ename), type(etype), durable(edurable),auto_delete(eauto_delete), args(eargs) {}// 解析字符串形式的参数,并将其存储到args中void setArgs(const std::string &str_args){// key=val&key=val&std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);// 解析每个键值对for (const auto &sub_arg : sub_args){size_t pos = sub_arg.find('=');if (pos != std::string::npos){std::string key = sub_arg.substr(0, pos);std::string value = sub_arg.substr(pos + 1);args[key] = value;}}}// 将args中的内容序列化为字符串并返回std::string getArgs(){std::string result;for (const auto &pair : args){result += pair.first + "=" + pair.second + "&"; // 拼接键值对}return result;}};
- 定义交换机数据持久化管理类–数据存储在sqlite数据库中
通过在数据库中操作数据表的方式,来管理这些持久化的交换机
// 2. 定义交换机数据持久化管理类--数据存储在sqlite数据库中using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;class ExchangeMapper{public:// 构造函数,初始化数据库文件路径ExchangeMapper(const std::string &dbfile): _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();}// 创建数据库表void createTable(){
#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);if (ret == false){DLOG("创建交换机数据库表失败!!");abort(); // 直接异常退出程序}}// 删除数据库表void removeTable(){
#define DROP_TABLE "drop table if exists exchange_table;"bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);if (ret == false){DLOG("删除交换机数据库表失败!!");abort(); // 直接异常退出程序}}// 插入一个交换机到数据库bool insert(Exchange::ptr &exp){std::stringstream ss;ss << "insert into exchange_table values(";ss << "'" << exp->name << "', ";ss << exp->type << ", ";ss << exp->durable << ", ";ss << exp->auto_delete << ", ";ss << "'" << exp->getArgs() << "');";return _sql_helper.exec(ss.str(), nullptr, nullptr);}// 从数据库中删除指定名称的交换机void remove(const std::string &name){std::stringstream ss;ss << "delete from exchange_table where name=";ss << "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}// using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;ExchangeMap recovery(){ExchangeMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";// 调用 _sql_helper 的 exec 方法执行 SQL 查询// selectCallback 是回调函数,用于处理查询结果// &result 是传递给回调函数的参数,用于存储查询结果_sql_helper.exec(sql, selectCallback, &result);return result;}private:// 静态回调函数,用于处理 SQL 查询的每一行结果// arg: 传递给回调函数的参数,这里是一个 ExchangeMap 指针,用于存储查询结果// numcol: 当前行的列数// row: 当前行的数据,是一个字符串数组,每一列对应一个字符串// fields: 列名数组(未使用)static int selectCallback(void *arg, int numcol, char **row, char **fields){// 将 arg 转换为 ExchangeMap 指针ExchangeMap *result = (ExchangeMap *)arg;// 创建一个新的 Exchange 对象,使用智能指针管理内存auto exp = std::make_shared<Exchange>();exp->name = row[0];exp->type = (zwbMQ::ExchangeType)std::stoi(row[1]);exp->durable = (bool)std::stoi(row[2]);exp->auto_delete = (bool)std::stoi(row[3]);// 如果第五列(args)不为空,则调用 setArgs 方法设置交换机的参数if (row[4])exp->setArgs(row[4]);// 将 Exchange 对象插入到 result 中,以交换机的名称作为键// (*result)[exp->name] = exp;result->insert(std::make_pair(exp->name, exp));return 0;}private:SqliteHelper _sql_helper;};
- 定义交换机数据内存管理类
对外提供的交换机管理接口类,用户可以使用这里的接口管理交换机
// 3. 定义交换机数据内存管理类
class ExchangeManager
{
public:using ptr = std::shared_ptr<ExchangeManager>;// 构造函数,初始化数据库文件路径ExchangeManager(const std::string &dbfile): _mapper(dbfile){// 初始化时从数据库加载所有交换机数据到内存_exchanges = _mapper.recovery();}// 声明一个交换机bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) != _exchanges.end()){return true; // 如果已存在,返回 true}// 创建新的交换机对象auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);if (durable == true){bool ret = _mapper.insert(exp);if (ret == false)return false;}// 将交换机插入到内存中_exchanges.insert(std::make_pair(name, exp));return true;}// 删除指定名称的交换机void deleteExchange(const std::string &name){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) == _exchanges.end()){return;}// 从数据库中删除交换机if (_exchanges.find(name)->second->durable == true)_mapper.remove(name);// 从内存中删除交换机_exchanges.erase(name);}// 获取指定名称的交换机对象Exchange::ptr selectExchange(const std::string &name){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) == _exchanges.end()){return Exchange::ptr();}return _exchanges.find(name)->second;}// 判断指定名称的交换机是否存在bool exists(const std::string &name){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全// 检查交换机是否已存在if (_exchanges.find(name) == _exchanges.end()){return false;}return true;}// 获取当前交换机的数量size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}// 清理所有交换机数据void clear(){std::lock_guard<std::mutex> lock(_mutex); // 加锁,保证线程安全_mapper.removeTable();_exchanges.clear();}private:// 互斥锁,用于线程安全std::mutex _mutex;// 交换机数据持久化管理对象ExchangeMapper _mapper;// 内存中存储的交换机数据std::unordered_map<std::string, Exchange::ptr> _exchanges;
};
队列数据管理模块
- 定义队列结构体
队列要管理的主要数据
std::string name; // 队列名称bool durable; // 是否持久化bool exclusive; // 是否独占bool auto_delete; // 是否自动删除std::unordered_map<std::string, std::string> args; // 其他参数
// 定义队列结构体
struct MsgQueue
{using ptr = std::shared_ptr<MsgQueue>; // 定义智能指针别名,方便使用 shared_ptr 管理 MsgQueue 对象std::string name; // 队列名称bool durable; // 是否持久化bool exclusive; // 是否独占bool auto_delete; // 是否自动删除std::unordered_map<std::string, std::string> args; // 其他参数MsgQueue() {}MsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, std::unordered_map<std::string, std::string> qargs): name(qname), durable(qdurable), exclusive(qexclusive), auto_delete(qauto_delete), args(qargs){}// 解析字符串形式的参数,并将其存储到args中void setArgs(const std::string &str_args){// key=val&key=val&std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);// 解析每个键值对for (const auto &sub_arg : sub_args){size_t pos = sub_arg.find('=');if (pos != std::string::npos){std::string key = sub_arg.substr(0, pos);std::string value = sub_arg.substr(pos + 1);args[key] = value;}}}// 将args中的内容序列化为字符串并返回std::string getArgs(){std::string result;for (const auto &pair : args){result += pair.first + "=" + pair.second + "&"; // 拼接键值对}return result;}
};
- 定义队列数据持久化管理类–数据存储在sqlite数据库中
通过在数据库中操作数据表的方式,来管理这些持久化的队列
// 2. 定义队列数据持久化管理类--数据存储在sqlite数据库中
class MsqQueueMapper
{
public:// 构造函数,接受一个数据库文件路径作为参数MsqQueueMapper(const std::string &dbfile): _sql_helper(dbfile) // 初始化 SQLite 帮助类{// 获取数据库文件的父目录路径std::string path = FileHelper::parentDirectory(dbfile);// 创建目录(如果不存在)FileHelper::createDirectory(path);// 打开数据库连接,并断言连接成功assert(_sql_helper.open());// 创建消息队列表createTable();}// 定义智能指针别名,方便使用 shared_ptr 管理 MsgQueue 对象using ptr = std::shared_ptr<MsgQueue>;// 队列名称std::string name;// 是否持久化bool durable;// 是否独占bool exclusive;// 是否自动删除bool auto_delete;// 其他参数std::unordered_map<std::string, std::string> args;// 创建消息队列表void createTable(){std::stringstream sql;sql << "create table if not exists queue_table(";sql << "name varchar(32) primary key,"; sql << "durable int,"; sql << "exclusive int,"; sql << "auto_delete int,"; sql << "args varchar(128));"; assert(_sql_helper.exec(sql.str(), nullptr, nullptr));}// 删除消息队列表void removeTable(){std::string sql = "drop table if exists queue_table";assert(_sql_helper.exec(sql, nullptr, nullptr));}// 插入一条消息队列记录bool insert(MsgQueue::ptr &queue){std::stringstream sql;sql << "insert into queue_table values(";sql << "'" << queue->name << "',"; sql << queue->durable << ","; sql << queue->exclusive << ","; sql << queue->auto_delete << ","; sql << "'" << queue->getArgs() << "');";return _sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定名称的消息队列记录bool remove(const std::string &name){std::stringstream sql;sql << "delete from queue_table where name=";sql << "'" << name << "';";return _sql_helper.exec(sql.str(), nullptr, nullptr);}// 定义队列映射类型,键为队列名称,值为 MsgQueue 智能指针using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;// 从数据库中恢复所有消息队列记录QueueMap recovery(){QueueMap result;std::string sql = "select name, durable, exclusive, auto_delete, args from queue_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:// 静态回调函数,用于处理 SQL 查询的每一行结果// arg: 传递给回调函数的参数,这里是一个 QueueMap 指针,用于存储查询结果// numcol: 当前行的列数// row: 当前行的数据,是一个字符串数组,每一列对应一个字符串// fields: 列名数组(未使用)static int selectCallback(void *arg, int numcol, char **row, char **fields){// 将 arg 转换为 QueueMap 指针QueueMap *result = (QueueMap *)arg;// 创建一个新的 MsgQueue 对象,使用智能指针管理内存MsgQueue::ptr mqp = std::make_shared<MsgQueue>(); mqp->name = row[0]; // 队列名称mqp->durable = (bool)std::stoi(row[1]); // 是否持久化mqp->exclusive = (bool)std::stoi(row[2]); // 是否独占mqp->auto_delete = (bool)std::stoi(row[3]); // 是否自动删除if (row[4])mqp->setArgs(row[4]); // 其他参数// 将队列对象插入到结果映射中result->insert(std::make_pair(mqp->name, mqp));return 0;}SqliteHelper _sql_helper;
};
- 定义队列数据内存管理类
对外提供的队列管理接口类,用户可以使用这里的接口管理队列
// 3. 定义交换机数据内存管理类
// 3. 定义队列数据内存管理类class MsgQueueManager
{
public:using ptr = std::shared_ptr<MsgQueueManager>;// 构造函数,初始化 MsgQueueManager 对象// 参数 dbfile 是用于持久化消息队列的数据库文件路径MsgQueueManager(const std::string &dbfile): _mapper(dbfile) // 初始化 _mapper 对象,用于数据库操作{// 从数据库中恢复消息队列数据_msg_queues = _mapper.recovery();}// 声明一个新的消息队列// 返回 true 表示队列声明成功,false 表示失败bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete, const google::protobuf::Map<std::string, std::string> qargs){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(qname); // 查找队列是否已存在if (it != _msg_queues.end()){return true; // 如果队列已存在,直接返回 true}MsgQueue::ptr mqp = std::make_shared<MsgQueue>(); // 创建一个新的消息队列对象mqp->name = qname; // 设置队列名称mqp->durable = qdurable; // 设置队列是否持久化mqp->exclusive = qexclusive; // 设置队列是否独占mqp->auto_delete = qauto_delete; // 设置队列是否自动删除mqp->args = qargs; // 设置队列的其他参数if (qdurable == true){bool ret = _mapper.insert(mqp); // 如果队列是持久化的,将其插入数据库if (ret == false)return false; // 插入失败,返回 false}_msg_queues.insert(std::make_pair(qname, mqp)); // 将队列插入到内存中的队列映射中return true; // 返回 true 表示队列声明成功}// 删除指定名称的消息队列// 参数 name 是要删除的队列名称void deleteQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(name); // 查找队列是否存在if (it == _msg_queues.end()){return; // 如果队列不存在,直接返回}if (it->second->durable == true)_mapper.remove(name); // 如果队列是持久化的,从数据库中删除_msg_queues.erase(name); // 从内存中的队列映射中删除队列}// 根据名称查找并返回消息队列// 参数 name 是要查找的队列名称// 返回指向消息队列的智能指针,如果队列不存在,返回空的智能指针MsgQueue::ptr selectQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(name); // 查找队列是否存在if (it == _msg_queues.end()){return MsgQueue::ptr(); // 如果队列不存在,返回空的智能指针}return it->second; // 返回找到的队列}// 返回所有消息队列的映射// 返回一个包含所有消息队列的映射QueueMap allQueue(){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全return _msg_queues; // 返回内存中的队列映射}// 检查指定名称的消息队列是否存在// 参数 name 是要检查的队列名称// 返回 true 表示队列存在,false 表示队列不存在bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全auto it = _msg_queues.find(name); // 查找队列是否存在if (it == _msg_queues.end()){return false; // 如果队列不存在,返回 false}return true; // 返回 true 表示队列存在}// 返回当前消息队列的数量// 返回消息队列的数量size_t size(){std::unique_lock<std::mutex> lock(_mutex); // 加锁,确保线程安全return _msg_queues.size(); // 返回队列映射的大小}// 清空所有消息队列// 删除数据库中的表,并清空内存中的队列映射void clear(){_mapper.removeTable(); // 删除数据库中的表_msg_queues.clear(); // 清空内存中的队列映射}private:std::mutex _mutex; // 互斥锁,用于确保线程安全MsgQueueMapper _mapper; // 用于数据库操作的映射器对象QueueMap _msg_queues; // 内存中的消息队列映射
};
绑定数据管理模块
实现了一个消息队列系统中的绑定管理功能
将消息队列(Queue)与交换机(Exchange)通过绑定键(Binding Key)关联起来
- Binding 结构体,表示绑定信息
// 定义Binding结构体,表示绑定信息
struct Binding
{using ptr = std::shared_ptr<Binding>; // 使用智能指针管理Binding对象std::string exchange_name; // 交换机名称std::string msgqueue_name; // 消息队列名称std::string binding_key; // 绑定键Binding() {}Binding(const std::string &ename, const std::string &qname, const std::string &key): exchange_name(ename), msgqueue_name(qname), binding_key(key) {}
};
- BindingMapper 类,负责管理与数据库的交互,包括创建表、插入、删除和恢复绑定信息
数据库建表进行管理
// 定义队列名与绑定信息的映射关系
using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
每个队列自己的绑定信息。绑定在哪个交换机、binding_key是什么?
// 定义交换机名称与队列绑定信息的映射关系
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
MsgQueuesBindingMap 和交换机名构建集合,每个交换机都对应着一个队列集合
// 定义队列名与绑定信息的映射关系
using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
// 定义交换机名称与队列绑定信息的映射关系
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
// BindingMapper类负责管理与数据库的交互,包括创建表、插入、删除和恢复绑定信息
class BindingMapper
{
public:BindingMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);_sql_helper.open();createTable();}// 创建绑定信息表void createTable(){std::stringstream sql;sql << "create table if not exists binding_table(";sql << "exchange_name varchar(32), ";sql << "msgqueue_name varchar(32), ";sql << "binding_key varchar(128));";assert(_sql_helper.exec(sql.str(), nullptr, nullptr));}// 删除绑定信息表void removeTable(){std::string sql = "drop table if exists binding_table;";_sql_helper.exec(sql, nullptr, nullptr);}// 插入绑定信息bool insert(Binding::ptr &binding){std::stringstream sql;sql << "insert into binding_table values(";sql << "'" << binding->exchange_name << "', ";sql << "'" << binding->msgqueue_name << "', ";sql << "'" << binding->binding_key << "');";return _sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定交换机和队列的绑定信息void remove(const std::string &ename, const std::string &qname){std::stringstream sql;sql << "delete from binding_table where ";sql << "exchange_name='" << ename << "' and ";sql << "msgqueue_name='" << qname << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定交换机的所有绑定信息void removeExchangeBindings(const std::string &ename){std::stringstream sql;sql << "delete from binding_table where ";sql << "exchange_name='" << ename << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);}// 删除指定队列的所有绑定信息void removeMsgQueueBindings(const std::string &qname){std::stringstream sql;sql << "delete from binding_table where ";sql << "msgqueue_name='" << qname << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);}// 从数据库中恢复绑定信息BindingMap recovery(){BindingMap result;std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:// SQL查询回调函数,用于处理查询结果static int selectCallback(void *arg, int numcol, char **row, char **fields){BindingMap *result = (BindingMap *)arg;Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);MsgQueueBindingMap &qmap = (*result)[bp->exchange_name];qmap.insert(std::make_pair(bp->msgqueue_name, bp));return 0;}private:SqliteHelper _sql_helper;
};
- BindingManager 类,负责管理绑定信息,包括绑定、解绑、删除和查询等操作
// BindingManager类负责管理绑定信息,包括绑定、解绑、删除和查询等操作
class BindingManager
{
public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string &dbfile) : _mapper(dbfile){_bindings = _mapper.recovery();}// 绑定交换机和队列bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable){std::unique_lock<std::mutex> lock(_mutex);auto it = _bindings.find(ename);if (it != _bindings.end() && it->second.find(qname) != it->second.end()){return true;}Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);if (durable){bool ret = _mapper.insert(bp);if (ret == false)return false;}auto &qbmap = _bindings[ename];qbmap.insert(std::make_pair(qname, bp));return true;}// 解绑交换机和队列void unBind(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return;}auto qit = eit->second.find(qname);if (qit == eit->second.end()){return;}_mapper.remove(ename, qname);_bindings[ename].erase(qname);}// 删除指定交换机的所有绑定信息void removeExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeExchangeBindings(ename);_bindings.erase(ename);}// 删除指定队列的所有绑定信息void removeMsgQueueBindings(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgQueueBindings(qname);for (auto start = _bindings.begin(); start != _bindings.end(); ++start){start->second.erase(qname);}}// 获取指定交换机的所有绑定信息MsgQueueBindingMap getExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return MsgQueueBindingMap();}return eit->second;}// 获取指定交换机和队列的绑定信息Binding::ptr getBinding(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return Binding::ptr();}auto qit = eit->second.find(qname);if (qit == eit->second.end()){return Binding::ptr();}return qit->second;}// 检查指定交换机和队列的绑定信息是否存在bool exists(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()){return false;}auto qit = eit->second.find(qname);if (qit == eit->second.end()){return false;}return true;}// 获取所有绑定信息的总数size_t size(){size_t total_size = 0;std::unique_lock<std::mutex> lock(_mutex);for (auto start = _bindings.begin(); start != _bindings.end(); ++start){total_size += start->second.size();}return total_size;}// 清空所有绑定信息void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_bindings.clear();}private:std::mutex _mutex; // 互斥锁,用于线程安全BindingMapper _mapper; // BindingMapper对象,用于数据库操作BindingMap _bindings; // 存储所有绑定信息的映射
};
消息数据管理模块
实现了消息队列中的消息管理
消息属性
ID:消息的唯一标识
持久化标志:是否对消息进行持久化
routing_key:消息发布到交换机后,与其绑定的各个队列中的 binding_key 进行匹配,决定发布到哪个队列
消息定义在了proto文件
syntax = "proto3";
package zwbMQ; // 交换机类型
enum ExchangeType
{UNKNOWTYPE = 0; // 未知类型DIRECT = 1; // 直接交换FANOUT = 2; // 广播交换TOPIC = 3; // 主题交换
};// 消息投递模式
enum DeliveryMode
{UNKNOWMODE = 0; // 未知模式UNDURABLE = 1; // 非持久化DURABLE = 2; // 持久化
};// 消息的基本属性
message BasicProperties
{string id = 1; // 消息的唯一标识DeliveryMode delivery_mode = 2; // 消息的投递模式string routing_key = 3; // 路由键
};// 消息类型
message Message
{// 消息的有效负载message Payload {BasicProperties properties = 1; // 消息的基本属性string body = 2; // 消息体string valid = 3; // 消息的有效性标识};Payload payload = 1; // 消息的有效负载uint32 offset = 2; // 消息的偏移量uint32 length = 3; // 消息的长度
};
上述多出来的属性
- 消息的有效负载:标志该消息是否属于有效消息,规定当持久化有效消息比例低于总体消息的 50% 时,会执行垃圾回收机制
- 消息的偏移量:当前消息对于文件起始位置的偏移量
- 消息长度:从偏移量位置取出该长度消息,解决黏包问题
- MessageMapper 类, 负责消息的持久化存储和恢复,包括消息的插入、删除、垃圾回收等操作
持久化存储格式
4字节长度|数据|4字节长度|数据
bool insert(const std::string &filename, MessagePtr &msg)
{//新增数据都是添加在文件末尾的//1. 进行消息的序列化,获取到格式化后的消息std::string body = msg->payload().SerializeAsString();//2. 获取文件长度FileHelper helper(filename);size_t fsize = helper.size();size_t msg_size = body.size();//写入逻辑:1. 先写入4字节数据长度, 2, 再写入指定长度数据bool ret = helper.write((char*)&msg_size, fsize, sizeof(size_t));if (ret == false) {DLOG("向队列数据文件写入数据长度失败!");return false;}//3. 将数据写入文件的指定位置ret = helper.write(body.c_str(), fsize + sizeof(size_t), body.size());if (ret == false) {DLOG("向队列数据文件写入数据失败!");return false;}//4. 更新msg中的实际存储信息msg->set_offset(fsize + sizeof(size_t));msg->set_length(body.size());return true;
}
垃圾回收机制
当持久化消息的总量超过 2000 条且有效消息比例低于 50% 时,触发垃圾回收;
垃圾回收会将有效消息重新写入临时文件,删除原文件,并将临时文件重命名为原文件;
std::list<MessagePtr> gc()
{bool ret;std::list<MessagePtr> result;ret = load(result);if (ret == false) {DLOG("加载有效数据失败!\n");return result;}//2. 将有效数据,进行序列化存储到临时文件中FileHelper::createFile(_tmpfile);for (auto &msg : result) {DLOG("向临时文件写入数据: %s", msg->payload().body().c_str());ret = insert(_tmpfile, msg);if (ret == false) {DLOG("向临时文件写入消息数据失败!!");return result;}}//3. 删除源文件ret = FileHelper::removeFile(_datafile);if (ret == false) {DLOG("删除源文件失败!");return result;}//4. 修改临时文件名,为源文件名称ret = FileHelper(_tmpfile).rename(_datafile);if (ret == false) {DLOG("修改临时文件名称失败!");return result;}//5. 返回新的有效数据return result;
}
createMsgFile():创建消息存储文件
bool createMsgFile()
{if (FileHelper(_datafile).exists() == true) {return true;}bool ret = FileHelper::createFile(_datafile);if (ret == false) {DLOG("创建队列数据文件 %s 失败!", _datafile.c_str());return false;}return true;
}
- QueueMessage 类,管理单个消息队列的消息,包括消息的插入、删除、恢复、垃圾回收等操作
class QueueMessage{public:using ptr = std::shared_ptr<QueueMessage>;QueueMessage(std::string &basedir, const std::string &qname):_mapper(basedir, qname), _qname(qname), _valid_count(0), _total_count(0) {}bool recovery() {//恢复历史消息std::unique_lock<std::mutex> lock(_mutex);_msgs = _mapper.gc();for (auto &msg : _msgs) {_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}_valid_count = _total_count = _msgs.size();return true;}bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable) {//1. 构造消息对象MessagePtr msg = std::make_shared<Message>();msg->mutable_payload()->set_body(body);if (bp != nullptr) {DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(bp->id());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());}else {DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key("");}std::unique_lock<std::mutex> lock(_mutex);//2. 判断消息是否需要持久化if (msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {msg->mutable_payload()->set_valid("1");//在持久化存储中表示数据有效//3. 进行持久化存储bool ret = _mapper.insert(msg);if (ret == false) {DLOG("持久化存储消息:%s 失败了!", body.c_str());return false;}_valid_count += 1; //持久化信息中的数据量+1_total_count += 1;_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}//4. 内存的管理_msgs.push_back(msg);return true;}MessagePtr front(){std::unique_lock<std::mutex> lock(_mutex);if (_msgs.size() == 0) {return MessagePtr();}//获取一条队首消息:从_msgs中取出数据MessagePtr msg = _msgs.front();_msgs.pop_front();//将该消息对象,向待确认的hash表中添加一份,等到收到消息确认后进行删除_waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));return msg;}//每次删除消息后,判断是否需要垃圾回收bool remove(const std::string &msg_id) {std::unique_lock<std::mutex> lock(_mutex);//1. 从待确认队列中查找消息auto it = _waitack_msgs.find(msg_id);if (it == _waitack_msgs.end()) {DLOG("没有找到要删除的消息:%s!", msg_id.c_str());return true;}//2. 根据消息的持久化模式,决定是否删除持久化信息if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {//3. 删除持久化信息_mapper.remove(it->second);_durable_msgs.erase(msg_id);_valid_count -= 1;//持久化文件中有效消息数量 -1gc(); //内部判断是否需要垃圾回收,需要的话则回收一下}//4. 删除内存中的信息_waitack_msgs.erase(msg_id);//DLOG("确认消息后,删除消息的管理成功:%s", it->second->payload().body().c_str());return true;}size_t getable_count() {std::unique_lock<std::mutex> lock(_mutex);return _msgs.size();}size_t total_count() {std::unique_lock<std::mutex> lock(_mutex);return _total_count;}size_t durable_count() {std::unique_lock<std::mutex> lock(_mutex);return _durable_msgs.size();}size_t waitack_count() {std::unique_lock<std::mutex> lock(_mutex);return _waitack_msgs.size();}void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgFile();_msgs.clear();_durable_msgs.clear();_waitack_msgs.clear();_valid_count = 0;_total_count = 0;}private:bool GCCheck() {//持久化的消息总量大于2000, 且其中有效比例低于50%则需要持久化if (_total_count > 2000 && _valid_count * 10 / _total_count < 5) {return true;}return false;}void gc() {//1. 进行垃圾回收,获取到垃圾回收后,有效的消息信息链表if (GCCheck() == false) return;std::list<MessagePtr> msgs = _mapper.gc();for (auto &msg : msgs) {auto it = _durable_msgs.find(msg->payload().properties().id());if (it == _durable_msgs.end()) {DLOG("垃圾回收后,有一条持久化消息,在内存中没有进行管理!");_msgs.push_back(msg); ///做法:重新添加到推送链表的末尾_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));continue;}//2. 更新每一条消息的实际存储位置it->second->set_offset(msg->offset());it->second->set_length(msg->length());}//3. 更新当前的有效消息数量 & 总的持久化消息数量_valid_count = _total_count = msgs.size();}private:std::mutex _mutex;std::string _qname;size_t _valid_count;size_t _total_count;MessageMapper _mapper;std::list<MessagePtr> _msgs;//待推送消息std::unordered_map<std::string, MessagePtr> _durable_msgs;//持久化消息hashstd::unordered_map<std::string, MessagePtr> _waitack_msgs;//待确认消息hash
};
- MessageManager 类,对外提供功能接口
class MessageManager {public:using ptr = std::shared_ptr<MessageManager>;MessageManager(const std::string &basedir): _basedir(basedir){}void clear() {std::unique_lock<std::mutex> lock(_mutex);for (auto &qmsg : _queue_msgs) {qmsg.second->clear();}}void initQueueMessage(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it != _queue_msgs.end()) {return ;}qmp = std::make_shared<QueueMessage>(_basedir, qname);_queue_msgs.insert(std::make_pair(qname, qmp));}qmp->recovery();}void destroyQueueMessage(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {return ;}qmp = it->second;_queue_msgs.erase(it);}qmp->clear();}bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool queue_is_durable) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("向队列%s新增消息失败:没有找到消息管理句柄!", qname.c_str());return false;}qmp = it->second;}return qmp->insert(bp, body, queue_is_durable);}MessagePtr front(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s队首消息失败:没有找到消息管理句柄!", qname.c_str());return MessagePtr();}qmp = it->second;}return qmp->front();}void ack(const std::string &qname, const std::string &msg_id) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("确认队列%s消息%s失败:没有找到消息管理句柄!", qname.c_str(), msg_id.c_str());return ;}qmp = it->second;}qmp->remove(msg_id);return ;}size_t getable_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s待推送消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->getable_count();}size_t total_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s总持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->total_count();}size_t durable_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s有效持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->durable_count();}size_t waitack_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s待确认消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->waitack_count();}private:std::mutex _mutex;std::string _basedir;std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
};
虚拟机管理模块
对数据管理模块的整合模块交换机、队列、绑定
VirtualHost 类通过组合多个管理器(ExchangeManager、MsgQueueManager、BindingManager、MessageManager)来实现功能
虚拟机管理信息
- 交换机数据管理模块句柄
- 队列数据管理模块句柄
- 绑定数据管理模块句柄
- 消息数据管理模块句
虚拟机对外操作
- 提供虚拟机内交换机声明,交换机删除操作
- 提供虚拟机内队列声明,队列删除操作
- 提供虚拟机内交换机-队列绑定,解除绑定操作
- 获取交换机相关绑定信息
虚拟机管理操作
- 创建虚拟机
- 查询虚拟机
-删除虚拟机
namespace zwbMQ
{class VirtualHost {public:using ptr = std::shared_ptr<VirtualHost>; // 使用智能指针管理 VirtualHost 对象// 构造函数VirtualHost(const std::string &hname, const std::string &basedir, const std::string &dbfile):_host_name(hname), // 初始化虚拟主机名称_emp(std::make_shared<ExchangeManager>(dbfile)), // 初始化交换机管理器_mqmp(std::make_shared<MsgQueueManager>(dbfile)), // 初始化队列管理器_bmp(std::make_shared<BindingManager>(dbfile)), // 初始化绑定管理器_mmp(std::make_shared<MessageManager>(basedir)) { // 初始化消息管理器// 获取所有队列信息,并通过队列名称恢复历史消息数据QueueMap qm = _mqmp->allQueue();for (auto &q : qm) {_mmp->initQueueMessage(q.first); // 初始化每个队列的消息管理}}// 声明交换机bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,const google::protobuf::Map<std::string, std::string> &args) {return _emp->declareExchange(name, type, durable, auto_delete, args);}// 删除交换机void deleteExchange(const std::string &name) {// 删除交换机时,需要删除与之相关的绑定信息_bmp->removeExchangeBindings(name);return _emp->deleteExchange(name);}// 检查交换机是否存在bool existsExchange(const std::string &name) {return _emp->exists(name);}// 获取指定交换机Exchange::ptr selectExchange(const std::string &ename) {return _emp->selectExchange(ename);}// 声明队列bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs) {// 初始化队列的消息管理_mmp->initQueueMessage(qname);// 创建队列return _mqmp->declareQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}// 删除队列void deleteQueue(const std::string &name) {// 删除队列时,需要删除队列的消息和绑定信息_mmp->destroyQueueMessage(name); // 删除队列的消息_bmp->removeMsgQueueBindings(name); // 删除队列的绑定信息return _mqmp->deleteQueue(name); // 删除队列}// 检查队列是否存在bool existsQueue(const std::string &name) {return _mqmp->exists(name);}// 获取所有队列QueueMap allQueue() {return _mqmp->allQueue();}// 绑定交换机和队列bool bind(const std::string &ename, const std::string &qname, const std::string &key) {// 检查交换机是否存在Exchange::ptr ep = _emp->selectExchange(ename);if (ep.get() == nullptr) {DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());return false;}// 检查队列是否存在MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());return false;}// 绑定交换机和队列return _bmp->bind(ename, qname, key, ep->durable && mqp->durable);}// 解绑交换机和队列void unBind(const std::string &ename, const std::string &qname) {return _bmp->unBind(ename, qname);}// 获取指定交换机的所有绑定信息MsgQueueBindingMap exchangeBindings(const std::string &ename) {return _bmp->getExchangeBindings(ename);}// 检查绑定是否存在bool existsBinding(const std::string &ename, const std::string &qname) {return _bmp->exists(ename, qname);}// 发布消息到指定队列bool basicPublish(const std::string &qname, BasicProperties *bp, const std::string &body) {// 检查队列是否存在MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("发布消息失败,队列%s不存在!", qname.c_str());return false;}// 插入消息return _mmp->insert(qname, bp, body, mqp->durable);}// 从指定队列消费消息MessagePtr basicConsume(const std::string &qname) {return _mmp->front(qname);}// 确认消息void basicAck(const std::string &qname, const std::string &msgid) {return _mmp->ack(qname, msgid);}// 清空虚拟主机中的所有数据void clear() {_emp->clear(); // 清空交换机_mqmp->clear(); // 清空队列_bmp->clear(); // 清空绑定_mmp->clear(); // 清空消息}private:std::string _host_name; // 虚拟主机名称ExchangeManager::ptr _emp; // 交换机管理器MsgQueueManager::ptr _mqmp; // 队列管理器BindingManager::ptr _bmp; // 绑定管理器MessageManager::ptr _mmp; // 消息管理器};
}#endif
交换路由模块
先把消息发给某个虚拟机,再告知要发给虚拟机上的哪个交换机,由交换机决定把这些消息放到哪些队列中
当客⼾端发布⼀条消息到交换机后,这条消息,应该被⼊队到该交换机绑定的哪些队列中?交换路由
模块就是决定这件事情的。
绑定信息中有一个binding_key, 消息中有一个routing_key
- ⼴播:将消息⼊队到该交换机的所有绑定队列中
- 直接:将消息⼊队到绑定信息中binding_key与消息routing_key⼀致的队列中
- 主题:将消息⼊队到绑定信息中binding_key与routing_key是匹配成功的队列中
namespace zwbMQ
{using namespace zwbMQ;class Router{public:static bool isLegalRoutingKey(const std::string &routing_key){// routing_key:只需要判断是否包含有非法字符即可, 合法字符( a~z, A~Z, 0~9, ., _)for (auto &ch : routing_key){if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.')){continue;}return false;}return true;}static bool isLegalBindingKey(const std::string &binding_key){// 1. 判断是否包含有非法字符, 合法字符:a~z, A~Z, 0~9, ., _, *, #for (auto &ch : binding_key){if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.') ||(ch == '*' || ch == '#')){continue;}return false;}// 2. *和#必须独立存在: news.music#.*.#std::vector<std::string> sub_words;StrHelper::split(binding_key, ".", sub_words);for (std::string &word : sub_words){if (word.size() > 1 &&(word.find("*") != std::string::npos ||word.find("#") != std::string::npos)){return false;}}// 3. *和#不能连续出现for (int i = 1; i < sub_words.size(); i++){if (sub_words[i] == "#" && sub_words[i - 1] == "*"){return false;}if (sub_words[i] == "#" && sub_words[i - 1] == "#"){return false;}if (sub_words[i] == "*" && sub_words[i - 1] == "#"){return false;}}return true;}static bool route(ExchangeType type, const std::string &routing_key, const std::string &binding_key){if (type == ExchangeType::DIRECT){return (routing_key == binding_key);}else if (type == ExchangeType::FANOUT){return true;}// 主题交换:要进行模式匹配 news.# & news.music.pop// 1. 将binding_key与routing_key进行字符串分割,得到各个的单词数组std::vector<std::string> bkeys, rkeys;int n_bkey = StrHelper::split(binding_key, ".", bkeys);int n_rkey = StrHelper::split(routing_key, ".", rkeys);// 2. 定义标记数组,并初始化[0][0]位置为true,其他位置为falsestd::vector<std::vector<bool>> dp(n_bkey + 1, std::vector<bool>(n_rkey + 1, false));dp[0][0] = true;// 3. 如果binding_key以#起始,则将#对应行的第0列置为1.for (int i = 1; i <= bkeys.size(); i++){if (bkeys[i - 1] == "#"){dp[i][0] = true;continue;}break;}// 4. 使用routing_key中的每个单词与binding_key中的每个单词进行匹配并标记数组for (int i = 1; i <= n_bkey; i++){for (int j = 1; j <= n_rkey; j++){// 如果当前bkey是个*,或者两个单词相同,表示单词匹配成功,则从左上方继承结果if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*"){dp[i][j] = dp[i - 1][j - 1];}else if (bkeys[i - 1] == "#"){// 如果当前bkey是个#,则需要从左上,左边,上边继承结果dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}}}return dp[n_bkey][n_rkey];}};
}
信道管理模块
Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection
#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__#include "muduo/net/TcpConnection.h" // muduo库的TCP连接类
#include "muduo/proto/codec.h" // Protobuf编解码器
#include "muduo/proto/dispatcher.h" // 消息分发器
#include "../mqcommon/mq_logger.hpp" // 日志模块
#include "../mqcommon/mq_helper.hpp" // 工具函数
#include "../mqcommon/mq_msg.pb.h" // 消息相关的Protobuf定义
#include "../mqcommon/mq_proto.pb.h" // 协议相关的Protobuf定义
#include "../mqcommon/mq_threadpool.hpp" // 线程池
#include "mq_consumer.hpp" // 消费者管理
#include "mq_host.hpp" // 虚拟主机管理
#include "mq_route.hpp" // 路由模块namespace zwbMQ
{// 定义一系列智能指针类型别名,简化代码using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;// Channel类:管理与客户端的通信通道,处理消息的发布、订阅、确认等操作class Channel {public:using ptr = std::shared_ptr<Channel>; // 定义Channel的智能指针类型别名// 构造函数:初始化Channel的各个成员变量Channel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool): _cid(id), _conn(conn), _codec(codec), _cmp(cmp), _host(host), _pool(pool) {DLOG("new Channel: %p", this); // 日志记录Channel的创建}// 析构函数:销毁Channel时移除相关的消费者~Channel() {if (_consumer.get() != nullptr) {_cmp->remove(_consumer->tag, _consumer->qname); // 移除消费者}DLOG("del Channel: %p", this); // 日志记录Channel的销毁}// 声明交换机void declareExchange(const declareExchangeRequestPtr &req) {bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->auto_delete(), req->args());return basicResponse(ret, req->rid(), req->cid()); // 发送响应}// 删除交换机void deleteExchange(const deleteExchangeRequestPtr &req) {_host->deleteExchange(req->exchange_name());return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 声明队列void declareQueue(const declareQueueRequestPtr &req) {bool ret = _host->declareQueue(req->queue_name(),req->durable(), req->exclusive(),req->auto_delete(), req->args());if (ret == false) {return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}_cmp->initQueueConsumer(req->queue_name()); // 初始化队列的消费者管理句柄return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 删除队列void deleteQueue(const deleteQueueRequestPtr &req) {_cmp->destroyQueueConsumer(req->queue_name()); // 销毁队列的消费者管理句柄_host->deleteQueue(req->queue_name()); // 删除队列return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 绑定队列到交换机void queueBind(const queueBindRequestPtr &req) {bool ret = _host->bind(req->exchange_name(), req->queue_name(), req->binding_key());return basicResponse(ret, req->rid(), req->cid()); // 发送响应}// 解绑队列与交换机void queueUnBind(const queueUnBindRequestPtr &req) {_host->unBind(req->exchange_name(), req->queue_name());return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 发布消息void basicPublish(const basicPublishRequestPtr &req) {// 1. 判断交换机是否存在auto ep = _host->selectExchange(req->exchange_name());if (ep.get() == nullptr) {return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}// 2. 进行交换路由,判断消息可以发布到交换机绑定的哪个队列中MsgQueueBindingMap mqbm = _host->exchangeBindings(req->exchange_name());BasicProperties *properties = nullptr;std::string routing_key;if (req->has_properties()) {properties = req->mutable_properties();routing_key = properties->routing_key();}for (auto &binding : mqbm) {if (Router::route(ep->type, routing_key, binding.second->binding_key)) {// 3. 将消息添加到队列中_host->basicPublish(binding.first, properties, req->body());// 4. 向线程池中添加一个消息消费任务auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 确认消息void basicAck(const basicAckRequestPtr &req) {_host->basicAck(req->queue_name(), req->message_id());return basicResponse(true, req->rid(), req->cid()); // 发送响应}// 订阅队列消息void basicConsume(const basicConsumeRequestPtr &req) {// 1. 判断队列是否存在bool ret = _host->existsQueue(req->queue_name());if (ret == false) {return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}// 2. 创建队列的消费者auto cb = std::bind(&Channel::callback, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3);_consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 取消订阅void basicCancel(const basicCancelRequestPtr &req) {_cmp->remove(req->consumer_tag(), req->queue_name()); // 移除消费者return basicResponse(true, req->rid(), req->cid()); // 发送响应}private:// 回调函数:处理消费者接收到的消息,并将消息推送给客户端void callback(const std::string tag, const BasicProperties *bp, const std::string &body) {basicConsumeResponse resp;resp.set_cid(_cid);resp.set_body(body);resp.set_consumer_tag(tag);if (bp) {resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, resp); // 发送消息给客户端}// 消费消息:从队列中取出消息并推送给订阅者void consume(const std::string &qname) {// 1. 从队列中取出一条消息MessagePtr mp = _host->basicConsume(qname);if (mp.get() == nullptr) {DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());return;}// 2. 从队列订阅者中取出一个订阅者Consumer::ptr cp = _cmp->choose(qname);if (cp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());return;}// 3. 调用订阅者对应的消息处理函数,实现消息的推送cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());// 4. 判断如果订阅者是自动确认,直接删除消息if (cp->auto_ack) _host->basicAck(qname, mp->payload().properties().id());}// 发送基本响应void basicResponse(bool ok, const std::string &rid, const std::string &cid) {basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn, resp); // 发送响应给客户端}private:std::string _cid; // Channel的唯一标识Consumer::ptr _consumer; // 当前Channel的消费者muduo::net::TcpConnectionPtr _conn; // TCP连接ProtobufCodecPtr _codec; // Protobuf编解码器ConsumerManager::ptr _cmp; // 消费者管理器VirtualHost::ptr _host; // 虚拟主机threadpool::ptr _pool; // 线程池};// ChannelManager类:管理多个Channel实例class ChannelManager {public:using ptr = std::shared_ptr<ChannelManager>; // 定义ChannelManager的智能指针类型别名// 打开Channelbool openChannel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it != _channels.end()) {DLOG("信道:%s 已经存在!", id.c_str()); // 日志记录return false;}auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel)); // 添加Channelreturn true;}// 关闭Channelvoid closeChannel(const std::string &id) {std::unique_lock<std::mutex> lock(_mutex);_channels.erase(id); // 移除Channel}// 获取ChannelChannel::ptr getChannel(const std::string &id) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it == _channels.end()) {return Channel::ptr(); // 返回空指针}return it->second; // 返回Channel}private:std::mutex _mutex; // 互斥锁,保证线程安全std::unordered_map<std::string, Channel::ptr> _channels; // 存储Channel的映射表};
}#endif // __M_CHANNEL_H__
连接管理模块
对muduo库中的Connection进⾏⼆次封装管理,并额外提供项⽬所需操作
新增连接,删除连接,获取连接,打开信道,关闭信道
#include "mq_channel.hpp" // 包含Channel类的头文件namespace zwbMQ {// Connection类:管理与客户端的连接,负责创建和管理Channelclass Connection {public:using ptr = std::shared_ptr<Connection>; // 定义Connection的智能指针类型别名// 构造函数:初始化Connection的各个成员变量Connection(const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool): _conn(conn), _codec(codec), _cmp(cmp), _host(host), _pool(pool),_channels(std::make_shared<ChannelManager>()) {} // 初始化ChannelManager// 打开Channelvoid openChannel(const openChannelRequestPtr &req) {// 1. 判断信道ID是否重复,创建信道bool ret = _channels->openChannel(req->cid(), _host, _cmp, _codec, _conn, _pool);if (ret == false) {DLOG("创建信道的时候,信道ID重复了"); // 日志记录return basicResponse(false, req->rid(), req->cid()); // 发送失败响应}DLOG("%s 信道创建成功!", req->cid().c_str()); // 日志记录// 3. 给客户端进行回复return basicResponse(true, req->rid(), req->cid()); // 发送成功响应}// 关闭Channelvoid closeChannel(const closeChannelRequestPtr &req) {_channels->closeChannel(req->cid()); // 关闭指定ID的Channelreturn basicResponse(true, req->rid(), req->cid()); // 发送响应}// 获取指定ID的ChannelChannel::ptr getChannel(const std::string &cid) {return _channels->getChannel(cid); // 返回Channel}private:// 发送基本响应void basicResponse(bool ok, const std::string &rid, const std::string &cid) {basicCommonResponse resp;resp.set_rid(rid); // 设置请求IDresp.set_cid(cid); // 设置Channel IDresp.set_ok(ok); // 设置操作结果_codec->send(_conn, resp); // 发送响应给客户端}private:muduo::net::TcpConnectionPtr _conn; // TCP连接ProtobufCodecPtr _codec; // Protobuf编解码器ConsumerManager::ptr _cmp; // 消费者管理器VirtualHost::ptr _host; // 虚拟主机threadpool::ptr _pool; // 线程池ChannelManager::ptr _channels; // Channel管理器};// ConnectionManager类:管理多个Connection实例class ConnectionManager {public:using ptr = std::shared_ptr<ConnectionManager>; // 定义ConnectionManager的智能指针类型别名// 构造函数ConnectionManager() {}// 创建新的Connectionvoid newConnection(const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool) {std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全auto it = _conns.find(conn);if (it != _conns.end()) {return; // 如果Connection已存在,直接返回}// 创建新的Connection并添加到管理器中Connection::ptr self_conn = std::make_shared<Connection>(host, cmp, codec, conn, pool);_conns.insert(std::make_pair(conn, self_conn));}// 删除Connectionvoid delConnection(const muduo::net::TcpConnectionPtr &conn) {std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全_conns.erase(conn); // 移除指定Connection}// 获取指定TCP连接的ConnectionConnection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn) {std::unique_lock<std::mutex> lock(_mutex); // 加锁,保证线程安全auto it = _conns.find(conn);if (it == _conns.end()) {return Connection::ptr(); // 如果Connection不存在,返回空指针}return it->second; // 返回Connection}private:std::mutex _mutex; // 互斥锁,保证线程安全std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns; // 存储Connection的映射表};}
消费者管理模块
这里的消费者指的是订阅了某一条队列的消息,分为发布客户端和订阅客户端;
当消费者订阅该队列后,所连接的服务器就会将该队列的消息轮询的推送给订阅该队列的消费者;
消费者数据
struct Consumer
{using ptr = std::shared_ptr<Consumer>;std::string tag; // 消费者标识std::string qname; // 消费者订阅的队列名称bool auto_ack; // 自动确认标志ConsumerCallback callback;Consumer(){DLOG("new Consumer: %p", this);}Consumer(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb):tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) {DLOG("new Consumer: %p", this);}~Consumer() {DLOG("del Consumer: %p", this);}
};
- tag:消费者的唯一标识
- qname:消费者订阅的队列名称
- auto_ack:是否自动确认消息(若为
true
,消息被取走后,自动从待确认列表中删除;若为false
,要等到客户端确认收到后,再进行删除) - callback:消息处理回调函数(队列有了消息后,通过该函数进行处理,服务端调用这个回调函数)
以队列为单元的消费者管理结构
- 成员变量
- _qname:队列名称
- _mutex:互斥锁,用于线程安全
- _rr_seq:轮转序号,用于轮转选择消费者
- _consumers:存储消费者
- 方法
- create:新增消费者
- remove:移除指定消费者
- choose:轮转选择消费者
- empty:判断是否没有消费者
- exists:判断指定消费者是否存在
- clear:清除所有消费
class QueueConsumer {public:using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0){}// 队列新增消费者Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb) {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);//2. 判断消费者是否重复for (auto &consumer : _consumers) {if (consumer->tag == ctag) {return Consumer::ptr();}}//3. 没有重复则新增--构造对象auto consumer = std::make_shared<Consumer>(ctag, queue_name, ack_flag, cb);//4. 添加管理后返回对象_consumers.push_back(consumer);return consumer;}// 队列移除消费者void remove(const std::string &ctag) {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);//2. 遍历查找-删除for (auto it = _consumers.begin(); it != _consumers.end(); ++it) {if ((*it)->tag == ctag) {_consumers.erase(it);return ;}}return;}// 队列获取消费者:RR轮转获取Consumer::ptr choose() {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);if (_consumers.size() == 0) {return Consumer::ptr();}//2. 获取当前轮转到的下标int idx = _rr_seq % _consumers.size();_rr_seq++;//3. 获取对象,返回return _consumers[idx];}// 是否为空bool empty() {std::unique_lock<std::mutex> lock(_mutex);return _consumers.size() == 0;}// 判断指定消费者是否存在bool exists(const std::string &ctag) {std::unique_lock<std::mutex> lock(_mutex);//2. 遍历查找for (auto it = _consumers.begin(); it != _consumers.end(); ++it) {if ((*it)->tag == ctag) {return true;}}return false;}// 清理所有消费者void clear() {std::unique_lock<std::mutex> lock(_mutex);_consumers.clear();_rr_seq = 0;}private:std::string _qname;std::mutex _mutex;uint64_t _rr_seq;//轮转序号std::vector<Consumer::ptr> _consumers;
};
管理的消费者操作
class ConsumerManager {public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager(){}void initQueueConsumer(const std::string &qname) {//1. 加锁std::unique_lock<std::mutex> lock(_mutex);//2. 重复判断auto it = _qconsumers.find(qname);if (it != _qconsumers.end()) {return ;}//3. 新增auto qconsumers = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qconsumers));}void destroyQueueConsumer(const std::string &qname) {std::unique_lock<std::mutex> lock(_mutex);_qconsumers.erase(qname);}Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb) {// 获取队列的消费者管理单元句柄,通过句柄完成新建QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->create(ctag, queue_name, ack_flag, cb);}void remove(const std::string &ctag, const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return ;}qcp = it->second;}return qcp->remove(ctag);}Consumer::ptr choose(const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->choose();}bool empty(const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->empty();}bool exists(const std::string &ctag, const std::string &queue_name) {QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->exists(ctag);}void clear() {std::unique_lock<std::mutex> lock(_mutex);_qconsumers.clear();}private:std::mutex _mutex;std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;
};
⽹络通信协议设计
借助于muduo库来进行实现,消息体是使⽤Protobuf进⾏序列化和反序列化,采用LV这种数据结构,先有一个定长的长度L,其描述了value有多长,来解决念包问题;
value中针对不同的请求有着不同的协议字段,而协议字段采用的是Protobuf进行序列化和反序列化
Server服务器模块
对于服务器来说,并没有什么实际的功能,
而是将之前代码的一个整合
有一个虚拟机管理句柄
有一个消费者管理句柄
有一个连接管理句柄
有一个工作线程池
muduo::net::EventLoop _baseloop; // 主事件循环器,⽤于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件muduo::net::TcpServer _server;//服务器对象ProtobufDispatcher _dispatcher;//请求分发器对象--要向其中注册请求处理函数ProtobufCodecPtr _codec;//protobuf协议处理器--针对收到的请求数据进行protobuf协议处理VirtualHost::ptr _virtual_host;ConsumerManager::ptr _consumer_manager;ConnectionManager::ptr _connection_manager;threadpool::ptr _threadpool;//异步⼯作线程池,主要⽤于队列消息的推送⼯作
借助muduo库实现一个TCP服务器对象,当有新连接到来后要如何进行读取和分发以及上面的io事件如何处理都是在这个模块来进行实现的,而对于新连接来说,会有一个dispatcher分发器,这个分发器的作用就是根据不同的消息类型进行合适的分发,装载到不同的处理函数当中,
namespace zwbMQ
{// 定义数据库文件路径和虚拟主机名称#define DBFILE "/meta.db"#define HOSTNAME "MyVirtualHost"class Server {public:// 定义消息指针类型typedef std::shared_ptr<google::protobuf::Message> MessagePtr;// 构造函数,初始化服务器Server(int port, const std::string &basedir): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),_consumer_manager(std::make_shared<ConsumerManager>()),_connection_manager(std::make_shared<ConnectionManager>()),_threadpool(std::make_shared<threadpool>()) {// 初始化所有队列的消费者管理结构QueueMap qm = _virtual_host->allQueue();for (auto &q : qm) {_consumer_manager->initQueueConsumer(q.first);}// 注册各种请求的处理函数_dispatcher.registerMessageCallback<zwbMQ::openChannelRequest>(std::bind(&Server::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::queueBindRequest>(std::bind(&Server::onQueueBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicAckRequest>(std::bind(&Server::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<zwbMQ::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置服务器的消息回调和连接回调_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));}// 启动服务器void start() {_server.start();_baseloop.loop();}private:// 处理打开信道的请求void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("打开信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->openChannel(message);}// 处理关闭信道的请求void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("关闭信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->closeChannel(message);}// 处理声明交换机的请求void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明交换机时,没有找到信道!");return;}return cp->declareExchange(message);}// 处理删除交换机的请求void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除交换机时,没有找到信道!");return;}return cp->deleteExchange(message);}// 处理声明队列的请求void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明队列时,没有找到信道!");return;}return cp->declareQueue(message);}// 处理删除队列的请求void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除队列时,没有找到信道!");return;}return cp->deleteQueue(message);}// 处理队列绑定的请求void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列绑定时,没有找到信道!");return;}return cp->queueBind(message);}// 处理队列解绑的请求void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列解除绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列解除绑定时,没有找到信道!");return;}return cp->queueUnBind(message);}// 处理消息发布的请求void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("发布消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("发布消息时,没有找到信道!");return;}return cp->basicPublish(message);}// 处理消息确认的请求void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("确认消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("确认消息时,没有找到信道!");return;}return cp->basicAck(message);}// 处理队列消息订阅的请求void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息订阅时,没有找到信道!");return;}return cp->basicConsume(message);}// 处理队列消息取消订阅的请求void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到信道!");return;}return cp->basicCancel(message);}// 处理未知消息void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}// 处理连接事件void onConnection(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {_connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _threadpool);} else {_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop; // 事件循环muduo::net::TcpServer _server; // 服务器对象ProtobufDispatcher _dispatcher; // 请求分发器对象ProtobufCodecPtr _codec; // protobuf协议处理器VirtualHost::ptr _virtual_host; // 虚拟主机ConsumerManager::ptr _consumer_manager; // 消费者管理器ConnectionManager::ptr _connection_manager; // 连接管理器threadpool::ptr _threadpool; // 线程池};
}
客户端
客户端模块主要有以下的四大模块
-
订阅者模块:表示这是一个消费者
-
信道模块:包含一些常用接口,消息发布确认等等
-
连接模块:它的作用是进行打开和关闭信道
-
异步线程模块:客户端连接的IO事件监控,推送来的消息进行异步处理线程
订阅者模块
当消费者订阅了一个队列,那么当这个队列有了消息之后,就会自动推送给这个消费者
客户端的订阅者和服务端的消费者是一样的
namespace zwbMQ
{using ConsumerCallback = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;struct Consumer {using ptr = std::shared_ptr<Consumer>;std::string tag; //消费者标识std::string qname; //消费者订阅的队列名称bool auto_ack; //自动确认标志ConsumerCallback callback;Consumer(){DLOG("new Consumer: %p", this);}Consumer(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb): tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) {DLOG("new Consumer: %p", this);}~Consumer() {DLOG("del Consumer: %p", this);}};
}
信道管理模块
服务端中有信道,客户端也有,二者的机制基本一致,无论哪个信道最终的目的都是提供服务;
不同的是,服务端的信道是为服务器提供服务;客户端的信道是给用户提供
信道所管理的信息
std::string _cid; // Channel ID
muduo::net::TcpConnectionPtr _conn; // TCP 连接
ProtobufCodecPtr _codec; // Protobuf 编解码器
Consumer::ptr _consumer; // 消费者对象
std::mutex _mutex; // 互斥锁
std::condition_variable _cv; // 条件变量
std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp; // 响应哈希表
信道的组织操作
namespace zwbMQ
{typedef std::shared_ptr<google::protobuf::Message> MessagePtr;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;class Channel{public:using ptr = std::shared_ptr<Channel>;// 构造函数,初始化 Channel 对象Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec);// 析构函数,调用 basicCancel 方法~Channel();// 返回 Channel 的 IDstd::string cid();// 打开 Channel,发送 openChannelRequest 请求并等待响应bool openChannel();// 关闭 Channel,发送 closeChannelRequest 请求并等待响应 void closeChannel();// 声明 Exchange,发送 declareExchangeRequest 请求并等待响应bool declareExchange(const std::string &name,ExchangeType type,bool durable,bool auto_delete,google::protobuf::Map<std::string, std::string> &args);// 删除 Exchange,发送 deleteExchangeRequest 请求并等待响应void deleteExchange(const std::string &name);// 声明 Queue,发送 declareQueueRequest 请求并等待响应bool declareQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string, std::string> &qargs);// 删除 Queue,发送 deleteQueueRequest 请求并等待响应void deleteQueue(const std::string &qname);// 绑定 Queue 和 Exchange,发送 queueBindRequest 请求并等待响应bool queueBind(const std::string &ename,const std::string &qname,const std::string &key);// 解绑 Queue 和 Exchange,发送 queueUnBindRequest 请求并等待响应void queueUnBind(const std::string &ename, const std::string &qname);// 发布消息,发送 basicPublishRequest 请求并等待响应void basicPublish(const std::string &ename,const BasicProperties *bp,const std::string &body);// 确认消息,发送 basicAckRequest 请求并等待响应void basicAck(const std::string &msgid);// 取消消费者订阅,发送 basicCancelRequest 请求并等待响应void basicCancel();// 订阅消息,发送 basicConsumeRequest 请求并等待响应bool basicConsume(const std::string &consumer_tag,const std::string &queue_name,bool auto_ack,const ConsumerCallback &cb);// 将基础响应放入哈希表中void putBasicResponse(const basicCommonResponsePtr &resp);// 处理收到的消息推送void consume(const basicConsumeResponsePtr &resp);private:// 等待响应,直到收到指定 rid 的响应basicCommonResponsePtr waitResponse(const std::string &rid);private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp;};
信道的管理操作
class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager();Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec);// 移除指定 Channelvoid remove(const std::string &cid);// 获取指定 ChannelChannel::ptr get(const std::string &cid);private:std::mutex _mutex; // 互斥锁std::unordered_map<std::string, Channel::ptr> _channels; // Channel 哈希表};
连接管理模块
先创建连接,通过连接创建信道,通过信道提供服务
这个模块就是针对于muduo库客户端进行的二次封装
给用户提供了一个创建Channel的接口,创建信道后,借助信道来提供服务
muduo::CountDownLatch _latch; // 用于实现同步的 CountDownLatch
muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接
muduo::net::TcpClient _client; // 客户端
ProtobufDispatcher _dispatcher; // 请求分发器
ProtobufCodecPtr _codec; // 协议处理器AsyncWorker::ptr _worker; // 异步工作线程
ChannelManager::ptr _channel_manager; // 信道管理器
- 构造函数,初始化连接
- 打开一个信道
- 关闭一个信道
- 处理基础响应消息
- 处理消费响应消息
- 处理未知消息
- 处理连接事件
异步线程池模块
- EventLoopThread模块进行IO事件监控
- 收到推送的消息,需要对推送过来的消息进行处理,要一个线程池来完成消息处理
- 多个连接用一个EventLoopThread进行IO监控
源码
https://gitee.com/nuyoahc/message-queue-mq
相关文章:
仿 RabbitMQ 实现的简易消息队列
文章目录 项目介绍开放环境第三⽅库介绍ProtobufMuduo库 需求分析核⼼概念实现内容 消息队列系统整体框架服务端模块数据管理模块虚拟机数据管理模块交换路由模块消费者管理模块信道(通信通道)管理模块连接管理模块 客户端模块 公共模块日志类其他工具类…...
JavaScript基础知识及高频用法
目录 一、语言基础:构建代码逻辑的积木 二、核心概念:理解JavaScript的灵魂 三、高频用法:现代开发必备技巧 四、避坑指南:常见错误与调试 五、学习路线与资源推荐 从入门到实战,掌握现代Web开发基石 作为全球使…...
VUE项目中实现权限控制,菜单权限,按钮权限,接口权限,路由权限,操作权限,数据权限实现
VUE项目中实现权限控制,菜单权限,按钮权限,接口权限,路由权限,操作权限,数据权限实现 权限系统分类(RBAC)引言菜单权限按钮权限接口权限路由权限 菜单权限方案方案一:菜单…...
多机器人系统的大语言模型:综述
25年2月来自 Drexel 大学的论文“Large Language Models for Multi-Robot Systems: A Survey”。 大语言模型 (LLM) 的快速发展为多机器人系统 (MRS) 开辟新的可能性,从而增强通信、任务规划和人机交互。与传统的单机器人和多智体系统不同,MRS 带来独特…...
如何在 Java 应用中实现数据库的主从复制(读写分离)?请简要描述架构和关键代码实现?
在Java应用中实现数据库主从复制(读写分离) 一、架构描述 (一)整体架构 主库(Master) 负责处理所有的写操作(INSERT、UPDATE、DELETE等)。它是数据的源头,所有的数据变…...
Redis 数据类型 Hash 哈希
在 Redis 中,哈希类型是指值本⾝⼜是⼀个键值对结构,形如 key "key",value { { field1, value1 }, ..., {fieldN, valueN } },Redis String 和 Hash 类型⼆者的关系可以⽤下图来表⽰。 Hash 数据类型的特点 键值对集合…...
17.推荐系统的在线学习与实时更新
接下来就讲解推荐系统的在线学习与实时更新。推荐系统的在线学习和实时更新是为了使推荐系统能够动态地适应用户行为的变化,保持推荐结果的实时性和相关性。以下是详细的介绍和实现方法。 推荐系统的在线学习与实时更新 在线学习的概念 在线学习(Onli…...
网络安全检测思路
对于主机的安全检测,我们通常直接采用nmap或者类似软件进行扫描,然后针对主机操作系统及其 开放端口判断主机的安全程度,这当然是一种方法,但这种方法往往失之粗糙,我仔细考虑了一下,觉 得按下面的流程进行…...
老游戏回顾:SWRacer
竞速类游戏里,我很怀念它。 虽然已经25年过去了。 相比之下,别的游戏真的没法形容。 ---- 是LucasArts制作的一款赛车竞速游戏; 玩家要扮演一名银河旅行者参加各种赛车比赛,赢得奖金,在经历了八个不同星球上的24场…...
MySQL面试题合集
1.MySQL中的数据排序是怎么实现的? 回答重点 排序过程中,如果排序字段命中索引,则利用 索引排序。 反之,使用文件排序。 文件排序中,如果数据量少则在内存中排序, 具体是使用单路排序或者双路排序。 如果数据大则利用磁盘文件进行外部排序,一 般使用归并排序。 知识…...
如何在Java EE中使用标签库?
在Java EE(现在称为Jakarta EE)中使用标签库(Tag Library),主要是通过JSP标准标签库(JSTL)或自定义标签库来实现的。标签库允许在JSP页面中使用自定义的标签,从而简化页面逻辑、增强…...
第 12 天:行为树(Behavior Tree),让 AI 更智能!
🎯 目标: ✅ 理解 Unreal Engine 5 行为树(Behavior Tree) ✅ 创建行为树(BT)和黑板(Blackboard)管理 AI 状态 ✅ 使用任务(Task)让 AI 巡逻、追踪、攻击玩家…...
LabVIEW 用户界面设计基础原则
在设计LabVIEW VI的用户界面时,前面板的外观和布局至关重要。良好的设计不仅提升用户体验,还能提升界面的易用性和可操作性。以下是设计用户界面时的一些关键要点: 1. 前面板设计原则 交互性:组合相关的输入控件和显示控件&#x…...
自然语言处理NLP入门 -- 第三节词袋模型与 TF-IDF
目标 了解词袋模型(BoW)和 TF-IDF 的概念通过实际示例展示 BoW 和 TF-IDF 如何将文本转换为数值表示详细讲解 Scikit-learn 的实现方法通过代码示例加深理解归纳学习难点,并提供课后练习和讲解 3.1 词袋模型(Bag of Words, BoW&a…...
Flappy Bird开发学习记录
概述 为了了解一下Unity的开发过程,或者说感受?先搞简单的练练手。 工具 Unity:2022.3.51f1c1 visual studio 2022 开发过程 项目基本设置 新建2d项目,游戏画面设置为1080*1920(9:16)。 图片素材设…...
Visual Studio 使用 “Ctrl + /”键设置注释和取消注释
问题:在默认的Visual Studio中,选择单行代码后,按下Ctrl /键会将代码注释掉,但再次按下Ctrl /键时,会进行双重注释,这不是我们想要的。 实现效果:当按下Ctrl /键会将代码注释掉,…...
CTF-WEB: 利用Web消息造成DOM XSS
如果索引中有类似如下代码 <!-- Ads to be inserted here --> <div idads> </div> <script>window.addEventListener(message, function(e) {document.getElementById(ads).innerHTML e.data;}); </script>这行代码的作用是将接收到的消息内容…...
2025 西湖论剑wp
web Rank-l 打开题目环境: 发现一个输入框,看一下他是用上面语言写的 发现是python,很容易想到ssti 密码随便输,发现没有回显 但是输入其他字符会报错 确定为ssti注入 开始构造payload, {{(lipsum|attr(‘global…...
常见的排序算法:插入排序、选择排序、冒泡排序、快速排序
1、插入排序 步骤: 1.从第一个元素开始,该元素可以认为已经被排序 2.取下一个元素tem,从已排序的元素序列从后往前扫描 3.如果该元素大于tem,则将该元素移到下一位 4.重复步骤3,直到找到已排序元素中小于等于tem的元素…...
LVDS接口总结--(5)IDELAY3仿真
仿真参考资料如下: https://zhuanlan.zhihu.com/p/386057087 timescale 1 ns/1 ps module tb_idelay3_ctrl();parameter REF_CLK 2.5 ; // 400MHzparameter DIN_CLK 3.3 ; // 300MHzreg ref_clk ;reg …...
数据库的基本概念
在当今的信息时代,数据已成为企业乃至整个社会的重要资产。如何有效地存储、管理和利用这些数据成为了技术发展的关键领域之一。数据库系统作为数据管理的核心工具,在软件开发、数据分析等多个方面扮演着不可或缺的角色。本文将带你了解数据库的一些基本…...
Redis性能优化
1.是否使用复杂度过高的命令 首先,第一步,你需要去查看一下 Redis 的慢日志(slowlog)。 Redis 提供了慢日志命令的统计功能,它记录了有哪些命令在执行时耗时比较久。 查看 Redis 慢日志之前,你需要设置慢…...
CCF-CSP第34次认证第二题——矩阵重塑(其二)【需反复思考学习!!!】
第34次认证第二题——矩阵重塑(其二) 官网题目链接 时间限制: 1.0 秒 空间限制: 512 MiB 相关文件: 题目目录(样例文件) 题目背景 矩阵转置操作是将矩阵的行和列交换的过程。在转置过程中&…...
大模型DeepSeek-R1学习
学习路线 机器学习-> 深度学习-> 强化学习-> 深度强化学习 大模型演进分支 微调: SFT 监督学习蒸馏:把大模型作为导师训练小模型RLHF:基于人类反馈的强化学习 PPO 近端策略优化 油门 - 重要性采样 权重 * 打分刹车 - clip 修剪…...
Spring Cloud — 深入了解Eureka、Ribbon及Feign
Eureka 负责服务注册与发现;Ribbon负责负载均衡;Feign简化了Web服务客户端调用方式。这三个组件可以协同工作,共同构建稳定、高效的微服务架构。 1 Eureka 分布式系统的CAP定理: 一致性(Consistency)&am…...
19.4.9 数据库方式操作Excel
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 本节所说的操作Excel操作是讲如何把Excel作为数据库来操作。 通过COM来操作Excel操作,请参看第21.2节 在第19.3.4节【…...
《深度LSTM vs 普通LSTM:训练与效果的深度剖析》
在深度学习领域,长短期记忆网络(LSTM)以其出色的处理序列数据能力而备受瞩目。而深度LSTM作为LSTM的扩展形式,与普通LSTM在训练和效果上存在着一些显著的不同。 训练方面 参数数量与计算量:普通LSTM通常只有一层或较少…...
认识一下redis的分布式锁
Redis的分布式锁是一种通过Redis实现的分布式锁机制,用于在分布式系统中确保同一时刻只有一个客户端可以访问某个资源。它通常用于防止多个应用实例在同一时间执行某些特定操作,避免数据的不一致性或竞争条件。 实现分布式锁的基本思路: 1. …...
untiy3D为游戏物体制作简单的动画
1.创建一个物体挂载动画组件Animator 2.创建一个动画控制器 3.动画控制器挂载到Animator组件 4.创建动画窗口>动画 入口默认执行left 执行效果 20250212_151707 脚本控制动画 鼠标点击是切换到动画t using System.Collections; using System.Collections.Generic; usi…...
微服务与网关
什么是网关 背景 单体项目中,前端只用访问指定的一个端口8080,就可以得到任何想要的数据 微服务项目中,ip是不断变化的,端口是多个的 解决方案:网关 网关:就是网络的关口,负责请求的路由、转发、身份校验。 前段还是访问之前的端口8080即可 后端对于前端来说是透明的 网…...
ArcGIS基础知识之ArcMap基础设置——ArcMap选项:常规选项卡设置及作用
作为一名 GIS 从业者,ArcMap 是我们日常工作中不可或缺的工具。对于初学者来说,掌握 ArcMap 的基础设置是迈向 GIS 分析与制图的第一步。今天,就让我们一起深入了解 ArcMap 选项中常规选项卡的各个设置,帮助大家更好地使用这款强大的软件。 在 ArcMap 中,常规选项卡是用户…...
Ubuntu轻松部署ToolJet低代码开发平台结合内网穿透远程访问
文章目录 前言1.关于ToolJet2.Docker部署3.简单使用演示4.安装cpolar内网穿透5. 配置公网地址6. 配置固定公网地址 前言 本文主要介绍如何在本地Linux服务器使用Docker部署轻量级开源文件分享系统ToolJet,并结合cpolar内网穿透工具轻松实现跨网络环境远程访问与使用…...
[MySQL]5-MySQL扩展(分片)
随着数据量和用户量增加,MySQL会有读写负载限制。以下是部分解决方案 目录 功能拆分 使用读池拓展读(较复杂) 排队机制 🌟分片拓展写 按业务或职责划分节点或集群 大数据集切分 分片键的选择 多个分片键 跨分片查询 资料…...
如何使用 DeepSeek 帮助自己的工作
Hi,我是布兰妮甜 !在当今快速发展的技术领域,人工智能(AI)工具已经成为提高工作效率、促进创新的重要助手。DeepSeek作为一款先进的AI解决方案,为用户提供了强大的数据处理、分析以及预测能力,可…...
Redis缓存穿透、击穿和雪崩面试相关问题整理
在互联网公司的面试中,Redis 的缓存穿透、击穿和雪崩是高频考点,尤其在北京的头部互联网公司(如字节、阿里、美团、快手等)。以下是针对这三个问题的详细解析及常见面试题方向: 一、缓存穿透(Cache Penetra…...
Flink之Watermark
Apache Flink 是一个分布式流处理框架,它非常擅长处理实时数据流。流处理中的一个关键挑战是事件时间的处理,因为在流式数据中,事件到达系统的顺序可能并不代表它们的实际发生时间。为了解决这一问题,Flink 引入了**Watermark&…...
vs构建网络安全系统 网络安全和网络搭建
网站的组成和搭建 网站由服务器,容器,脚本,数据库组成。 服务器和家庭电脑一样。 容器又为环境或服务:apache,lls,tomcat,nginx等 脚本:php,aspx,asp&#x…...
缓存穿透问题及解决方案
一、什么是缓存穿透? 在分布式系统中,缓存常常用于提高系统的性能,减轻数据库的压力。缓存穿透问题指的是请求的数据在缓存和数据库中都不存在,导致请求每次都直接查询数据库,无法从缓存中获取数据,从而绕…...
ElementUI el-popover弹框背景色设置
1.el-popover样式由于使用了 absolute 属性,导致脱离了节点,所以在父级元素使用class无法进行权重处理来修改其样式,解决方式如下:通过popper-class实现样式处理,避免全局样式污染 // html <el-popoverplacement&q…...
在 Mac ARM 架构上使用 nvm 安装 Node.js 版本 16.20.2
文章目录 1. 安装 nvm(如果还没有安装的话)2. 加载 nvm 配置3. 列出特定系列的 Node.js 版本(远程):4. 安装 Node.js 16.20.25. 使用指定版本的 Node.js6. 验证安装 在 Mac ARM 架构上使用 nvm 安装 Node.js 版本 16.…...
spring集成activiti流程引擎(源码)
前言 activiti工作流引擎项目,企业erp、oa、hr、crm等企事业办公系统轻松落地,请假审批demo从流程绘制到审批结束实例。 源码获取:本文末个人名片直接获取。 一、项目形式 springbootvueactiviti集成了activiti在线编辑器,流行…...
VLLM历次会议(2024.4)
Prefix Caching。预先算好KV cache,遇见公共前缀,复用之,避免再计算一遍。 场景:1. 多轮对话。2.公共的system prompt。 Guided Decoding(格式化输出) 通过Outlines工具实现。 支持正则表达式、JSON格式等。 输入:…...
基于 Docker 搭建 Elasticsearch + Kibana 环境
一、Elasticsearch 1. 下载镜像 elasticsearch镜像不支持latest标签,必须指定版本号 % docker pull elasticsearch:8.17.2 2. 启动容器 参考官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docker.html % docker run -p 9200:9200 -p 9…...
在 ARM64 架构系统离线安装 Oracle Java 8 全流程指南
在 ARM64 架构系统离线安装 Oracle Java 8 全流程指南 文章目录 在 ARM64 架构系统离线安装 Oracle Java 8 全流程指南一、引言二、下载前的准备2.1 确认系统架构2.2 注册 Oracle 账号 三、从 Oracle 官方下载 Java 8 for ARM643.1 访问 Oracle Java 存档页面3.2 选择合适的版本…...
策略模式-小结
总结一下看到的策略模式: A:一个含有一个方法的接口 B:具体的实行方式行为1,2,3,实现上面的接口。 C:一个环境类(或者上下文类),形式可以是:工厂模式,构造器注入模式,枚举模式。 …...
记一次Self XSS+CSRF组合利用
视频教程在我主页简介或专栏里 (不懂都可以来问我 专栏找我哦) 目录: 确认 XSS 漏洞 确认 CSRF 漏洞 这个漏洞是我在应用程序的订阅表单中发现的一个 XSS 漏洞,只能通过 POST 请求进行利用。通常情况下,基于 POST 的…...
VSCode + Continue 实现AI编程助理
安装VS Code 直接官网下载安装,反正是免费的。 安装VS插件Continue 直接在插件市场中搜索, Continue,第一个就是了。 配置Chat Model 点击Add Chat model后进行选择: 选择Ollama后,需要点击下面的config file : 由于…...
使用Typescript开发Babylon.js的Vue3模板参考
main.js文件 // main.ts import { createApp } from vue import App from ./App.vuecreateApp(App).mount(#app) App.vue文件 <!-- App.vue --> <template><div class"app-container"><BabylonScene /></div> </template><…...
STM32+Proteus+DS18B20数码管仿真实验
1. 实验准备 硬件方面: 了解 STM32 单片机的基本原理和使用方法,本实验可选用常见的 STM32F103 系列。熟悉 DS18B20 温度传感器的工作原理和通信协议(单总线协议)。数码管可选用共阴极或共阳极数码管,用于显示温度值。…...
DeepSeek自然语言处理(NLP)基础与实践
自然语言处理(Natural Language Processing, NLP)是人工智能领域的一个重要分支,专注于让计算机理解、生成和处理人类语言。NLP技术广泛应用于机器翻译、情感分析、文本分类、问答系统等场景。DeepSeek提供了强大的工具和API,帮助我们高效地构建和训练NLP模型。本文将详细介…...