项目——高并发内存池
目录
项目介绍
做的是什么
要求
内存池介绍
池化技术
内存池
解决的问题
设计定长内存池
高并发内存池整体框架设计
ThreadCache
ThreadCache整体设计
哈希桶映射对齐规则
ThreadCache TLS无锁访问
CentralCache
CentralCache整体设计
CentralCache结构设计
CentralCache核心实现
PageCache
PageCache整体设计
PageCache核心实现
申请内存流程测试
ThreadCache回收内存
CentralCache回收内存
PageCache回收内存
释放内存流程测试
大于256KB的大块内存申请问题
使用定长内存池脱离new
释放内存时优化为不传内存大小
多线程环境下对比malloc测试
调试技巧
性能瓶颈分析
基数树优化
项目源码
项目介绍
做的是什么
该项目实现是一个高并发的内存池:原型是google的一个开源项目tcmalloc ,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)
我们这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcamlloc的精华;虽然不是完全学习,但也要做好要有心理准备,因为这个项目哪个地方出现问题程序就直接崩溃,调试起来非常麻烦!当然另一方面,难度的上升,我们的收获和成长也是在这个过程中同步上升。
到如今很多程序员是熟悉这个项目的:把这个项目理解扎实了,会很受面试官的认可;但可可能面试官比较熟悉项目,对项目会问得比较深,比较细;如果你对项目掌握得不扎实,那么就容易碰钉子;所以带着信心与挑战,来开启本项目的探索吧!
要求
本项目会用到C/C++基本知识、数据结构(链表、哈希桶)、单例模式、多线程、互斥锁等等方面的知识;重在对语言的应用与存储系统内存管理的理解
内存池介绍
池化技术
所谓“池化技术”:就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当进程退出(或者特定时间)时,内存池才将之前申请的内存真正释放。我们使用malloc,free进行申请释放内存函数底层就是在干着这种事情
malloc()背后的实现原理
malloc的底层实现
Linux内存管理,malloc、free 实现原理
解决的问题
内存池主要解决的是效率问题:一次向系统申请过量内存后自己管理,当上层要申请时按照自己的管理方式高效地返回给上层使用;其次还解决了内存碎片问题
内存碎片又分为外碎片与内碎片:外碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求;内碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。这些问题都需要在设计内存池中进行解决
设计定长内存池
我们自己来设计内存池本质上就是把malloc,free要干的活交给我们来做~
既然要我们自己来管理内存的申请与释放,设计时就要有两个成员:_Memory和 _MemoryByte:_Memory指针指向我们申请的大块内存的首地址,_MemoryByte表示申请的大内存还剩下多少内存;
上层申请空间时,我们就把首地址给它,_Memory向后移动,_MemoryByte减去申请走的空间大小;如果_MemoryByte小于上层要申请的大小,我们就要向系统再次申请一块大内存来管理,而我们把_Memory的类型设置成char*而不是void*其实就是为了方便指针的移动(char*类型的指针走一步刚好是一字节)
当上层释放空间时,_FreeList成员就起作用了:不用把内存还给系统,而是把回收回来的内存一个接一个的链接在一起(链表的储存);下次上层申请时我们看看_FreeList有没有内存(也就是_FreeList是否为nullptr),有内存就直接给它用就行了,提高效率;
到了这里你可能会有疑问:_FreeList作为链表存储的话,每个内存就要有下一个内存的地址next成员,实现时是不是也是这样做? 可以这样实现,但没必要!这里我们玩一种新操作:回收过来的内存对象的前4/8字节的空间存储着下一个内存对象的地址
*(int*)object = (int)_FreeList; //object是回收过来的内存对象
上面的写法是否正确呢? 对一半,如果是64位访问下一个内存对象的地址应该是8字节,这样的话就要判断处理,较为麻烦;在这里提供一种万能方式:使用void**解引用的方式能够随着环境的变化而变化(可以理解成找void*的家长来撑场面)
//方法1
if (sizeof(t*) == 4) *(int*)object = (int)_freelist;
else *(long long*)object = (long long)_freelist;//方法2
*(void**)object = _FreeList;
既然是我们自己来管理内存,那么我们就不用malloc申请内存,使用系统调用VirtualAlloc来进行申请,;使用时因为还要考虑抛异常,不同环境下的申请,所以封装成函数ApplicateSpace(),按页数(1页 = 8K)进行申请
//使用系统调用申请空间static inline void* ApplicateSpace(size_t page){
#ifdef _WIN32void* ptr = VirtualAlloc(nullptr, // 让系统自动选择地址page * (1 << 13), // 分配大小MEM_COMMIT | MEM_RESERVE, // 提交并保留内存PAGE_READWRITE // 可读可写);
#else//使用Linux的mmap...void* ptr = mmap(nullptr, // 让系统自动选择地址page * (1 << 13(), // 分配大小PROT_READ | PROT_WRITE, // 可读可写MAP_PRIVATE | MAP_ANONYMOUS, // 私有匿名内存(不与文件关联)-1, // 文件描述符(匿名映射设为 -1)0 // 偏移量);
#endif//...if (ptr == nullptr){throw std::bad_alloc();exit(-1);}return ptr;}
设计内存池时还要设计成类模版:让内存池知道你要申请什么类型的内存;
template<class T>
class ObjectPool
{
public:T* New(){T* object = nullptr;//free的对象可以重复利用if (_FreeList){//链表的头删操作void* next = *(void**)_FreeList;object = (T*)_FreeList;_FreeList = next;}else{//如果下次空间不够用了怎么办? -> 引入_MemoryByte//可能申请的内存没有4/8字节来让我们存地址(细节)size_t ObjectByte = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);if (_MemoryByte < ObjectByte){_MemoryByte = 128 * 1024;//申请128KB够用了_Memory = (char*)ApplicateSpace(_MemoryByte>>PageShift);}object = (T*)_Memory;_Memory += ObjectByte;_MemoryByte -= ObjectByte;}//开辟的空间可能自己需要初始化:定位newnew(object)T;return object;}void Delete(T* object){//释放时可能T对象里开辟了空间,要先进行释放object->~T();//释放的内存头插的方式存到_FreeList中*(void**)object = _FreeList;_FreeList = object;}
private:char* _Memory = nullptr;size_t _MemoryByte = 0;void* _FreeList = nullptr;};
看了上面的代码有人又要问了:申请内存使用VirtualAlloc,那释放内存还给系统呢?你怎么没写? 其实当进程退出时,向系统申请的内存会系统会自动回收,不用我们操心;
以上实现还有一点细节:以上代码在给用户申请内存时,如果不够4字节(32位)/8字节(64位)要自动加到4/8字节,保证回收时链接存的下下一个内存对象的地址!
测试定长内存池
struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 3;// 每轮申请释放多少次const size_t N = 100000;size_t begin1 = clock();std::vector<TreeNode*> v1;v1.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();ObjectPool<TreeNode> TNPool;size_t begin2 = clock();std::vector<TreeNode*> v2;v2.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < 100000; ++i){v2.push_back(TNPool.New());}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}
预期结果是我们的定长内存池优于new
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题:性能问题;多线程环境下,锁竞争问题;
内存碎片问题
高并发内存池主要由以下3个部分构成:
1. ThreadCache:每个线程创建一个ThreadCache对象为每个线程所独有(线程本地缓存,其它线程是看不到的),里面储存着小于256K内存的分配,线程申请内存首先就是在里面申请,没内存了才会玩下一层去申请,并且这个过程是不用加锁的,这也是设计高效的一个点;
2. CentralCache:线程的ThreadCache对象没内存了,就在CentralCache对象里面申请,所以它是每个线程所共享的,共享就意味着要进行加锁,但这个锁不是全局锁,而是桶锁:因为每个线程要申请的内存不一定都是一样大,只有当线程去申请同一块内存时才会有竞争,但竞争一般不会很激烈;
3. PageCache:CentralCache对象没内存了,此时就要到PageCache对象里面申请内存,但PageCache管理的内存就不是一小块一小块,而是按页为单位进行管理;CentralCache申请过程就要对申请到的内存进行切分,切成自己要的小块小块内存后才能进行管理;而在这个过程同样需要加锁,此时加的是全局锁:因为CentralCache要进行遍历:没有符合大小的内存要将大内存切分的操作,所以要加把‘大’锁;
ThreadCache
ThreadCache整体设计
ThreadCache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表;
ThreadCache支持大于等于256KB内存的申请,但如果每种字节数都需要创建自由链表来管理的话,共需要20多万个桶才能管理地过来,明显是不太可能的;所以我们要进行一定的牺牲:按照一定范围的字节数进行对齐,如1~8字节直接给你申请8字节,9~16字节直接给你16字节,以此类推...
每个桶里挂着一个单链表(前面定长内存池管理释放的_FreeList)
这里的_FreeList进行封装:实现内存的插入与删除功能来支持内存的申请与释放
//实现成函数方便找下个节点的地址
static void*& Next(void* obj)
{return *(void**)obj;
}class FreeList
{
public:void Push(void* obj){assert(obj);//头插Next(obj) = _FreeList;_FreeList = obj;}void* Pop(){assert(_FreeList);//头删void* obj = _FreeList;_FreeList = Next(obj);return obj;}bool Empty(){return _FreeList == nullptr;}private:void* _FreeList=nullptr;
};
哈希桶映射对齐规则
规则见如下一下表格:(以字节为单位)
申请的内存大小范围 | 对齐字节数 | 映射到对应的桶(下标) |
---|---|---|
[1,128] | 8byte对齐 | _FreeList [0,16) |
[128 + 1,1024] | 16byte对齐 | _FreeList [16,72) |
[1024 + 1,81024] | 128byte对齐 | _FreeList [72,128) |
[8*1024 + 1 ,64*1024] | 1024byte对齐 | _FreeList [128,184) |
[64*1024 + 1,256*1024] | 8*1024byte对齐 | _FreeList [184,208) |
按照上面的规则我们先来实现字节数对齐(有关内存大小计算的函数建议都写在同一个类SizeClass中,方便进行管理)
static inline size_t _RoundUp(size_t bytes, size_t align)
{//一般写法/*if (bytes % align != 0){align = (bytes / align + 1) * align;}else{align = bytes;}*///高手 return (bytes + align - 1) & (~(align - 1));
}//计算对齐数
static inline size_t RoundUp(size_t bytes)
{if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{//申请的内存不可能这么大(后面再修改)assert(false);return -1;}}
一般写法好理解,但写成位运算就不太好理解(但计算机运算速度快啊),我们以10字节为例来分析:
&(~(对齐数-1))的意义是保留前面计算结果的最高位的1,其它位都变成1
接着实现字节数映射桶的函数:
//一般人
//static inline size_t _Index(size_t bytes, size_t Align)
//{
//
// if (bytes % Align == 0) return bytes / Align - 1;
// else bytes / Align;
//}//糕手
static inline size_t _Index(size_t bytes, size_t AlignIndex)
{ return ((bytes + (1 << AlignIndex) - 1) >> AlignIndex) - 1;
}//映射哈希桶的下标
static inline size_t Index(size_t bytes)
{if (bytes <= 128){return _Index(bytes, 3);//传2的指数进来,依次类推}else if (bytes <= 1024){return _Index(bytes, 4);}else if (bytes <= 8 * 1024){return _Index(bytes, 7);}else if (bytes <= 64 * 1024){return _Index(bytes, 10);}else if (bytes <= 256 * 1024){return _Index(bytes, 13);}else{assert(false);return -1;}
}
还是以10为例理解位运算:
接着定义最大的内存数256KB,ThreadCache的桶中最大的个数通过计算结果为208,也就是定义208个自由链表数组
static const size_t MaxFreeLists = 208;
static const size_t MaxBytes = 256 * 1024;
ThreadCache申请内存
class ThreadCache
{
public://Thread申请内存void* Allocate(size_t size);private:FreeList _FreeLists[MaxFreeLists];
};
将申请内存的大小转化成对齐数字节与对应桶的下标,然后去指定桶里看看_FreeList是否有为空:不为空就把第一个取出来,返回给线程(头删操作);为空就要到CentralCache去申请内存(等CentralCache来了才进行,暂时不处理)
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//...return nullptr;
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MaxBytes);//对齐数 找桶(下标)size_t align = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (_FreeLists[index].Empty()){//找CentralCache申请return FetchFromCentralCache(index,align);}else{return _FreeLists[index].Pop();}
}
ThreadCache TLS无锁访问
如果把ThreadCache定义成全局对象,那么所有的线程都能够访问到,势必就要进行加锁操作;但tcmalloc高效就高效在这,它不用进行加锁也能保证线程安全;解决方案:将ThreadCache定义成TLS(Thread Local Storage 线程本地储存):这样只有线程自己看到,就省去加锁操作啦,执行效率非常高;定义TLS的方法有多种,我们就用最简单的静态TLS
//定义在ThreadCache类的文件中
_declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
ConCurrentAlloc
进行测试之前,要先来创建ConCurrentAlloc文件:申请内存时每个线程先把TLS new出来后再去调用ThreadCache的Allocate方法申请内存;
static void* ConCurrentAlloc(size_t size)
{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}//测试cout << std::this_thread::get_id() <<" "<< pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}static void ConCurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);//...
}
首先测试每个线程是否都有独立的TLS
void test1()
{std::vector<void*> v;for (int i = 0; i < 5; ++i){void* ptr=ConCurrentAlloc(5);v.push_back(ptr);}}void test2()
{std::vector<void*> v;for (int i = 0; i < 5; ++i){void* ptr = ConCurrentAlloc(5);v.push_back(ptr);}
}void TestCCA()
{std::thread t1(test1);t1.join();std::thread t2(test2);t2.join();
}
运行程序出现报错:编译器告诉我们pTLSThreadCache被多次定义,这也就说:在链接时有两份pTLSThreadCache生成导致起冲突;
解决方案:把 pTLSThreadCache定义成静态:让它只生成一份当且仅在当前文件可见
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
运行结果:
CentralCache
CentralCache整体设计
CentralCache 也是一个哈希桶结构,他的哈希桶的映射关系跟ThreadCache是一样的。不同的是他的每个哈希桶位置挂是SpanList带头双向链表结构,管理一个一个Span;每个Span下挂着按照映射对齐规则计算出来的一块块内存,等着ThreadCache来申请;还有不同的是每个桶需要加上桶锁来保证线程安全,避免线程申请到同一个桶后出现问题
CentralCache结构设计
在CentralCache每个桶中的SpanList带头双向链表结构以及对应的Span结构体,我们先来设计Span结构体
struct span
{Page_t Page_Adder = 0;//大块内存起始页的页号size_t _n = 0;//管理的大块内存有多少页span* _prev=nullptr;span* _next=nullptr;void* _FreeList = nullptr;//span挂着的一个一个小块内存的首地址
};
Page_Addr
页号通常是4K或者8K,以8KB为例:申请到内存后通过会给我们返回内存的首地址,我们把地址除以8KB就得到了页号,下次要想使用就将页号乘以8KB得到了地址;页号本质上也是地址,只不过它是按照8KB的单位来管理的;
Page_t
在32位下地址空间的大小为2^32,在64位下地址空间的大小为2^64位,页号一般是4KB或者8KB;我们就以8K为例:在32位下被分成2^32 / 2^13 = 2^19块页,在64位下被分成2^64 / 2^13 = 2^51块页:不同环境下分成的页数的情况不同,我们就不能仅仅只定义出一个size_t,而是根据环境来选择;这里采用了条件编译的方法来解决
#ifdef _WIN64typedef unsigned long long Page_t;
#elif _WIN32typedef size_t Page_t;
#endif//..
在这里定义时还要注意:_WIN32宏在32位和64位的环境下都有定义,而_WIN64宏只在64位下才有定义:所以要把_WIN64先判断才能正确解决问题;
Span定义好后,使用SpanLIst链表对Span进行管理;链表选择的是带头双向链表(你也可以设计成其它形式的链表)把链表基本的插入删除等方法实现下
//将span组织起来
class SpanList
{
public:SpanList(){_head = new span;_head->_next = _head;_head->_prev = _head;}void InsertFront(span* NewPos){Insert(begin(), NewPos);}//pos前插入新spanvoid Insert(span* pos, span* NewPos){assert(NewPos != nullptr);// prev NewPos posspan* prev = pos->_prev;prev->_next = NewPos;NewPos->_prev = prev;NewPos->_next = pos;pos->_prev = NewPos;}span* EraseFront(){span* EraseSpan = _head->_next;Erase(EraseSpan);return EraseSpan;}void Erase(span* pos){assert(pos != nullptr);assert(pos != _head);span* prev = pos->_prev;span* next = pos->_next;prev->_next = next;next->_prev = prev;}std::mutex& Mutex(){return _mtx;}span* begin(){return _head->_next;}span* end(){return _head;}bool Empty(){return _head->_next == _head;}private:span* _head;std::mutex _mtx;//桶锁 每个thread申请同一桶需要竞争
};
最后才来设计CentralCache:由于CentralCache在全局中只有一个,我们设计成单例模式;但至于是饿汉还是懒汉模式取决于你自己;这里我们就简单设计成懒汉模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_CenIns;}private:CentralCache() {};CentralCache(const CentralCache&) = delete;static CentralCache _CenIns;SpanList _SpanList[MaxFreeLists];
};//要在.cpp中进行定义,不然会报错
CentralCache CentralCache::_CenIns;
CentralCache核心实现
向CentralCache申请多少合适?一个?两个?三个?
一次申请过少导致ThreadCache频繁来申请;申请过多又会导致ThreadCache内存处于空闲状态;为了解决该问题,设计出慢开始反馈算法:ThreadCache申请内存过小,CentralCache就尽量多给一点;ThreadCache申请内存过大,CentralCache就少给一点;
先来设计出一个函数通过size算出究竟要给多少块内存
// 在SizeCLass类中管理
// 批量从CentralCache获取多少个
static size_t NumMoveSize(size_t size)
{assert(size <= MaxBytes);size_t num = MaxBytes / size;//大对象if (num < 2){num = 2;}//小对象if (num > 512){num = 512;}return num;
}
再在_FreeList 中定义成员 _MaxSize = 1:表示本次申请内存的最大个数,配合以上函数实现
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//增量向CentralCache申请size_t batchNum = min(SizeClass::NumMoveSize(size), _FreeLists[index].MaxSize());if (_FreeLists[index].MaxSize() == batchNum) //说明是小内存申请{_FreeLists[index].MaxSize()++;}//...
}
小内存的申请按照指数级别的形式增长:前面先进行探测小内存申请的情况,申请次数变多就说明你有需要,就逐渐给你越来越多;申请大内存则不变,总的申请个数控制在 [2,512]之间;
ThreadCache从CentralCache获取内存
接下来就正式向CentralCache申请内存:我们计划要申请x个内存,但实际上可能申请不到这么多内存,所以要把实际申请到的个数给我们返回;可能一个也申请不到吗? 不可能!CentralCache没内存时不会给我们返回0个,它会去向下一层PageCache申请;如果只申请到一块就直接给线程使用;申请到多块内存就给线程一块,剩下的挂在对应桶上的_FreeList上等待下次内存申请使用;
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//增量向CentralCache申请(大对象上限低,小对象上限高 [2,512])size_t batchNum = min(SizeClass::NumMoveSize(size), _FreeLists[index].MaxSize());if (_FreeLists[index].MaxSize() == batchNum){_FreeLists[index].MaxSize()++;}//正式向CentralCache申请(实际申请)//通过start和end把内存给带出来void* start = nullptr;void* end = nullptr;size_t ActualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);//CentralCache内部assert(ActualNum >= 1);//一定至少申请到一块内存assert(start != nullptr && end != nullptr);if (ActualNum == 1){assert(start == end);return start;}else{//申请到的一批量内存取出一个出来后,//剩下的挂到ThreadCache对应的_freeList上_FreeLists[index].PushRange(Next(start), end, ActualNum - 1);//记得减掉要用的一个return start;}
}
插入一段范围的内存到_FreeLIst中
注意ThreadCache申请到的这批内存是一个一个连接在一起的,我们直接进行头插操作就行
class FreeList
{
public://...void PushRange(void* start, void* end, size_t n){Next(end) = _FreeList;_FreeList = start;}size_t& MaxSize(){return _MaxSize;}private:void* _FreeList=nullptr;size_t _MaxSize = 1;//增量向CentralCacle申请的个数
};
CentralCache内部实现
在CentralCache桶里申请时记得要加桶锁后才能进去,从桶里找到一个span后就要按照计划的个数batchNum申请内存,使用输出型参数start,end给返回,再把申请出去的内存给删掉
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{assert(batchNum > 0);size_t index = SizeClass::Index(size);//记得从CentralCache桶中申请加桶锁_SpanList[index].Mutex().lock();//在指定的桶里找spanspan* GetSpan = GetOneSpan(_SpanList[index], size);//不关心assert(GetSpan);assert(GetSpan->_FreeList);//span下的小块内存就根据实际需要给ThreadCache//使用start与end给带出去start = GetSpan->_FreeList;end = GetSpan->_FreeList;size_t ActualNum = 1;size_t i = 0;//考虑内存不够申请的情况while (i < batchNum - 1 && Next(end) != nullptr){end = Next(end);i++;ActualNum++;}//span头删,end尾巴置空GetSpan->_FreeList = Next(end);Next(end) = nullptr;_SpanList[index].Mutex().unlock();//记得解锁return ActualNum;
}
指定的桶里找span
找有没有span就要将整个桶进行遍历,有span就返回;找不到span就要向PageCache申请span(内存),(假设申请到了)申请到的span里面存着只是存着大内存没有进行切分成符合需求的一块块小内存,所有这里要进行切分操作:先定义指针指向大内存的首与尾(span里面的信息进行换算),将它切分成一小块一小块内存挂在span上,最后要把span头插进桶里面才算时真正申请到了
span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{span* it = list.begin();while (it != list.end()){if (it->_FreeList != nullptr){return it;}else{it = it->_next;}}//...//走到这里假设我们已经从PageCache申请到Span了//Span的信息换算成地址char* start = (char*)(Span->Page_Adder << PageShift);//char*方便+-字节数char* end = start + (Span->_n << PageShift);//切分成小块内存后挂在Span上//尾插之前先给头节点方便操作Span->_FreeList = start;void* tail = start;start += size;//尾插操作while (start < end){Next(tail) = start;tail = start;start += size;}Next(tail) = nullptr;//尾巴一定一定要置空//Span头插到桶上list.InsertFront(Span);return Span;
}
PageCache
PageCache整体设计
PageCache的结构与CentralCache大体是相同的:都是一个桶结构,每个桶用SpanList带头双向链表管理一个一个的span,但它与CentralCache也不同:
每一个span下面挂着不是按照映射对齐规则计算出来的内存,而是以页为单位的大块内存,映射在对应的桶上采用的是直接定址法,比如:下标为1的桶span下挂着1页的大内存,下标为2的桶挂着2页的大内存...;
PageCache由于是自身需要找与切的操作,就不能使用桶锁,而是要用一把全局锁在CentralCache申请内存的过程全部锁起来,保证线程安全(这部分等到下面实现时才能有体会);
static const size_t MaxPage = 129; //最大设计成128页,0号桶不存
//单例模式
class PageCache
{
public:static PageCache* GetInstance(){return &_PCIns;}std::mutex& GetMutex(){return _mtx;}private:static PageCache _PCIns;//全局只有一个PageCache() {}PageCache& operator=(const PageCache&) = delete;SpanList _PageList[MaxPage];std::mutex _mtx;//大锁
};
PageCache核心实现
NewSpan
CentralCache找不到内存可以给ThreadCache,就要到PageCache这里还申请内存;申请流程:想申请k页Span,想到对应的k号桶看看有没有?没有就要到往下找比k大的大内存;找到后先把Span从桶中分出来(头删),再创建一个NewSpan:里面保存着k页内存的信息,在把Span里面的内存信息减去k页内存信息;再重新挂到对应的桶上,返回NewSpan给CentralCache使用;但如果没有找到大内存进行切分就要向系统申请一块128页的大内存挂到128号桶上...
// 获取一个K页的span
span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < MaxPage);if (!_PageList[k].Empty()){//从桶中拿一个span出来span* NewSpan = _PageList[k].EraseFront();return NewSpan;}//从该位置往后找大span来切分for (size_t i = k + 1; i < MaxPage; i++){if (!_PageList[i].Empty()){//BigSpan要切k页NewSpan给CentralCachespan* BigSpan = _PageList[i].EraseFront();span* NewSpan = new span;NewSpan->Page_Adder = BigSpan->Page_Adder;NewSpan->_n = k;//BginSpan信息进行修改BigSpan->Page_Adder += k;BigSpan->_n -= k;//切出来剩下的挂到对应的桶上_PageList[BigSpan->_n].InsertFront(BigSpan);return NewSpan;}}//找不到了要向堆申请void* ptr = ApplicateSpace(MaxPage - 1);span* BigSpan = new span;BigSpan->Page_Adder = (Page_t)ptr >> PageShift;// 页号与地址转换要清晰BigSpan->_n = MaxPage - 1;//挂到128号桶上进行复用上面的逻辑最方便_PageList[MaxPage - 1].InsertFront(BigSpan);return NewSpan(k);
}
那CentralCache要申请几页的内存呢?一个,两个,三个? 与ThreadCache申请几个内存相似:设计一个函数来进行计算
// 计算一次向系统获取几个页(1页8KB)
static size_t NumMovePage(size_t size)
{//根据size计算要申请多少个size_t num = NumMoveSize(size);//根据个数计算总的页数 size_t npage = num * size;npage >>= PageShift;//不够1页给你1页if (npage == 0)npage = 1;return npage;
}
接着来把CentralCache中的GetOneSpan的逻辑进行补充:找不到Span(内存)要去PageCache申请内存之前,要把桶锁解掉:因为后续如果有线程来还内存时就不会阻塞在这,提高效率;还要记得加上PageCache的大锁,然后我们才能正式调用函数去申请内存;申请到Span(内存)后还要再解锁,之后把申请到的Span挂到对应的桶之前也要加桶锁(重新与线程竞争);这块逻辑一点也注意很容易造成死锁,要特别注意!
span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//...//我要到PageCache去申请内存啦list.Mutex().unlock();//记得加大锁PageCache::GetInstance()->GetMutex().lock();span* Span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));//记得解大锁PageCache::GetInstance()->GetMutex().unlock();//...//线程重新竞争桶锁了,加锁list.Mutex().lock();list.InsertFront(Span);return Span;
}
申请内存流程测试
写好申请内存的逻辑后,先来对这部分进行测试
void TestAlloc()
{void* p1 = ConCurrentAlloc(2);void* p2 = ConCurrentAlloc(3);void* p3 = ConCurrentAlloc(5);void* p4 = ConCurrentAlloc(8);cout << p1 << ' ' << p2 << ' ' << p3 << ' ' << p4 << endl;
}
程序能够正常返回通常就问题不大:这一部分建议调试进行观察现象,也加深对申请逻辑的理解
ThreadCache回收内存
先在ConCurrentAlloc中提供一个线程调用回收内存的函数
static void ConCurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);assert(ptr!=nullptr);assert(size <= MaxBytes);pTLSThreadCache->Deallocate(ptr, size);
}
接着设计Dellocate():ThreadCacheCache如何对释放的内存进行管理? 既然这块内存上层不想要了,那么ThreadCache就把这块内存给收集起来,得到下次申请时就把这块内存给上层;那至于要收集到那个桶上,这就要上层把这块内存对应的字节数给我们返回(这里待会要优化)
不断地释放内存,桶上收集到的内存就越长;如果不再继续处理,那这里的内存就闲置在这里,其它线程想用这些内存就要到CentralCache申请,CentralCache没内存就要到PageCache申请,非常浪费时间;所以要把这些多余的内存进行回收,那要回收几个?1个,2个,3个...这里按照:我们在ThreadCache申请内存时定义的_MaxSize(也就说FreeList的成员变量)进行比较:如果超过了就进行回收,回收_MaxSize个内存给CentralCache
void ThreadCache::Deallocate(void* ptr, size_t size)
{//回收到_FreeList对应index上(头插)//size_t align = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);_FreeLists[index].Push(ptr);if (_FreeLists[index].Size() >= _FreeLists[index].MaxSize()){ListTooLong(_FreeLists[index], size);}
}void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.MaxSize());//CentralCache回收内存CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
要知道_FreeList中有多少块内存,我们就要在_FreeList中定义成员_size记录内存个数,把Push,Pop,PushRange有关内存的操作后的内存个数统计上;接着再实现把一段范围的内存从ThreadCache桶中删除的方法
class FreeList
{
public:void Push(void* obj){assert(obj);//头插Next(obj) = _FreeList;_FreeList = obj;++_size;}void* Pop(){assert(_FreeList != nullptr);//头删void* obj = _FreeList;_FreeList = Next(obj);--_size;return obj;}void PushRange(void* start, void* end, size_t n){//不用进行切分!Next(end) = _FreeList;_FreeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);//循环头删start = _FreeList;end = _FreeList;for (size_t i = 0; i < n - 1; i++){end = Next(end);}_FreeList = Next(end);Next(end) = nullptr;_size -= n;}private:void* _FreeList=nullptr;size_t _MaxSize = 1;//增量向CentralCacle申请的个数size_t _size = 0;//当前_FreeList连接的内存个数
};
CentralCache回收内存
CentralCache得到回收过来的内存要插入到对应的桶上,但至于是挂到那个Span上此时是不知道的,而且每个内存都不一定是挂到同一个Span上,怎么办呢?
页号与span的映射
怎么实现通过页号找到Span,首先先要了解span是怎么来的;刚开始CentralCache是没有Span的,线程来CentralCache申请内存时,它要到PageCache中申请Span来满足需求;所以我们可以在这里作文章:在PageCache类中定义一个hash表,CentralCache来PageCache这里申请内存时:进行页号与Span之间的映射,再把Span返回;这里当CentralCache回收内存时就能知道这块内存是那个Span的了
class PageCache
{
public://...
private://...std::unordered_map<Page_t, span*> _PageToSpan;//建立页号与span*的映射
};span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < MaxPage);if (!_PageList[k].Empty()){span* NewSpan = _PageList[k].EraseFront();//这里不进行统计后面指定吃亏(后面写完调的时候爽到了)for (Page_t i = NewSpan->Page_Adder; i < NewSpan->Page_Adder + k; i++){//_PageToSpan[i] = NewSpan;_PageToSpan.set(i, NewSpan);}return NewSpan;}//往下找大span进行切分for (size_t i = k + 1; i < MaxPage; i++){if (!_PageList[i].Empty()){span* NewSpan = new span;NewSpan->Page_Adder = BigSpan->Page_Adder;NewSpan->_n = k;//统计for (Page_t i = NewSpan->Page_Adder; i < NewSpan->Page_Adder + k; i++){_PageToSpan[i] = NewSpan;}//...}}//...
}
有了记录后,还要在写一个传入地址返回span的函数(封装),注意这里有线程安全的问题:map底层时红黑树,插入删除进行节点之间进行旋转,如果不加锁访问,可能有线程在进行插入数据的操作红黑树结构进行调整,你找的节点位置发生变化可能就出现错误了,所以一定要加锁
span* PageCache::MapObjectToSpan(void* obj)
{//红黑树结构不是固定的要加锁(C++11)std::unique_lock<std::mutex> lock(_mtx);//地址 -> 页号auto FindSpan = _PageToSpan.find((Page_t)obj>>PageShift);if (FindSpan != _PageToSpan.end()){//map底层是pair类型return FindSpan->second;}//不可能走到这里断死assert(false);return nullptr;
}
有了上面的解决方案,完成CentralCache回收内存
// 从ThreadCache中回收来的内存挂到CentralCache对应的桶的span上
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{assert(start != nullptr);size_t index = SizeClass::Index(size);//先加桶锁_SpanList[index].Mutex().lock();while (start != nullptr){void* next = Next(start);//每块内存不一定是在同一个Spanspan* Span = PageCache::GetInstance()->MapObjectToSpan(start);//把内存头插到Span上Next(start) = Span->_FreeList;Span->_FreeList = start;start = next;}_SpanList[index].Mutex().unlock();
}
PageCache回收内存
与ThreadCache的情况一样:CentralCache回收回来的内存太长不用也会浪费,在一定情况下也要回收给PageCache,那么根据什么指标来判断某一个Span下的内存是时候回收了?
解决方案:在每个Span结构体定义一个成员变量:_UseCount 代表当前Span挂着的内存有多少个内存正在被 ThreadCache 使用;当 _UseCount 为0时就说明挂着的所有内存没人在使用了,是时候回收给PageCache了
struct span
{Page_t Page_Adder = 0;//申请大块内存的页号(地址)size_t _n = 0;//页的数量span* _prev=nullptr;span* _next=nullptr;size_t _UseCount = 0;//ThreadCache拿了多少个小块内存void* _FreeList = nullptr;//span挂着的一个一个内存的首地址
};// 从ThreadCache中回收来的内存挂到CentralCache对应的桶的span上
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{//头插内存Next(start) = Span->_FreeList;Span->_FreeList = start;Span->_UseCount--;//Span记录的UseCount为0说明是ThreadCache把内存全部回来了if (Span->_UseCount == 0){//Span回收前从桶中删除_SpanList[index].Erase(Span);//无关数据清除Span->_FreeList = nullptr;Span->_next = nullptr;Span->_prev = nullptr;//还给PageCache之前先解桶锁_SpanList[index].Mutex().unlock();//加PageCache的大锁PageCache::GetInstance()->GetMutex().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(Span);//PageCache的回收PageCache::GetInstance()->GetMutex().unlock();//span回收后再重新竞争桶锁_SpanList[index].Mutex().lock();}start = next;}_SpanList[index].Mutex().unlock();
}
PageCache合并内存
PageCache拿到CentralCache要回收的内存时,可以选择直接把该Span挂到对应的桶上就完了回收操作;但是如果刚好上层就永远不会申请到这块内存,一直堆积也会导致内存闲置(内存碎片),所以我们可以进行合并内存的操作:去左边看看有没有内存可以一起合并,取右边看看有没有内存可以一起合并...这个过程是循环的,直到找不到内存合并才停下来;
但是说着简单,做起来难:你怎么找到附近的有没有内存?
首先:Span的页号保存着内存的起始地址,页号-1不就找到了左边内存的结尾地址;同理:Span的页号加上Span的页号数量(n)就找到了右边内存的起始地址;这些内存是哪来的? 这些内存是CentralCache来申请内存时PageCache把大内存切分时剩下的内存:所以可以在进行切分的操作时把内存的起始地址和结尾地址与对应Span存起来,这里通过页号就能找到附近内存
// 获取一个K页的span
span* PageCache::NewSpan(size_t k)
{//...//往下找大span进行切分for (size_t i = k + 1; i < MaxPage; i++){//... //BigSpan是切分后剩下的内存_PageToSpan[BigSpan->Page_Adder] = BigSpan;//注意存右位置要减1_PageToSpan[BigSpan->Page_Adder + BigSpan->_n - 1] = BigSpan;}//...
}
其次:如果就按照以上思路去设计,还有BUG:附近的内存我是不知道是否正在被CentralCache或者ThreadCache使用的;强行进行合并其中一方在使用时就会出现错误,程序崩溃,为了解决这个Bug,在span结构体定义一个标记:
bool _IsUse = false;//从PageCache申请是否给CentralCache了
但CentralCache向PageCache申请Span成功后,这块Span的状态就为true;在标记时一定要在外面(调用函数完成时)进行标记,一定不能在函数内进行,内部情况有很多,稍不注意就制造出Bug来了
span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//...span* Span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));Span->_IsUse = true;//...
}
最后正式来完成合并内存的操作(左合并为例,右合并类似):
- 在哈希表中找不到左边的Span不合并;
- 找到了但其中一方在使用不合并;
- 合并后太大没有桶能够接受不合并(超出128页)
不满足以上情况就合并,合并,合并...最后把这块合并完成的Span头插到对应的桶上,要把使用标记置false(这里也代表我把CentralCache回收回来的Span使用情况处理了),还要把左右区间加入到哈希表中,让下一次合并内存能够找到
void PageCache::ReleaseSpanToPageCache(span* Span)
{assert(Span!=nullptr);//回收回来的span要与附件的span进行合并,解决内存碎片问题//左合并while (1){auto tmp = _PageToSpan.find(Span->Page_Adder - 1);//没有span不合并if (tmp == _PageToSpan.end()){break;}//span在被CentralCache使用不合并span* PrevSpan = tmp->second;if (PrevSpan->_IsUse == true){break;}//合成的span太大不合并if (Span->_n + PrevSpan->_n > MaxPage - 1){break;}//span变成大spanSpan->Page_Adder = PrevSpan->Page_Adder;Span->_n += PrevSpan->_n;//把PrevSpan从PageCache桶中delete_PageList[PrevSpan->_n].Erase(PrevSpan);delete PrevSpan;}//右合并while (1){auto tmp = _PageToSpan.find(Span->Page_Adder + Span->_n);//没有span不合并if (tmp == _PageToSpan.end()){break;}//span在被CentralCache使用不合并span* NextSpan = tmp->second;if (NextSpan->_IsUse == true){break;}//合成的span太大不合并if (Span->_n + NextSpan->_n > MaxPage - 1){break;}//span变成大spanSpan->_n += NextSpan->_n;//把prev从PageCache桶中delete_PageList[NextSpan->_n].Erase(NextSpan);delete NextSpan;}//挂到桶上_PageList[Span->_n].InsertFront(Span);//记得span标记与把左右区间加入Span->_IsUse = false;_PageToSpan[Span->Page_Adder] = Span;_PageToSpan[Span->Page_Adder + Span->_n - 1] = Span;
}
释放内存流程测试
void TestDeAlloc()
{void* p1 = ConCurrentAlloc(8);void* p2 = ConCurrentAlloc(6);void* p3 = ConCurrentAlloc(3);void* p4 = ConCurrentAlloc(2);void* p5 = ConCurrentAlloc(1);void* p6 = ConCurrentAlloc(7);void* p7 = ConCurrentAlloc(16);ConCurrentFree(p1,8);ConCurrentFree(p2,6);ConCurrentFree(p3,3);ConCurrentFree(p4,2);ConCurrentFree(p5,1);ConCurrentFree(p6,7);ConCurrentFree(p7,16);
}
能够正常返回一般问题不大:但还是建议进行调试看看是否是释放流程符合预期
大于256KB的大块内存申请问题
上面设计哈希桶映射对齐规则时,如果申请字节数大于256KB进行对齐时直接断言报错;到了现在我们可以来进行优化:超过256KB按照页数进行对齐
//计算对齐数
static inline size_t RoundUp(size_t bytes)
{//...else{//大于256k按页进行对齐return _RoundUp(bytes, 1 << PageShift);//assert(false);//return -1;}}
大于256KB的内存就不能去ThreadCache或者CentralCache申请了,直接去PageCache按页进行申请来满足需求
static void* ConCurrentAlloc(size_t size)
{if (size > MaxBytes){//直接去PageCache里申请size_t align = SizeClass::RoundUp(size);//页数对齐size_t page = align >> PageShift;PageCache::GetInstance()->GetMutex().lock();span* Span = PageCache::GetInstance()->NewSpan(page);PageCache::GetInstance()->GetMutex().unlock();return (void*)(Span->Page_Adder << PageShift);}else{//assert(size <= MaxBytes);if (pTLSThreadCache == nullptr){static ObjectPool<ThreadCache> TLPool;//pTLSThreadCache = new ThreadCache;pTLSThreadCache = TLPool.New();}return pTLSThreadCache->Allocate(size);}
}
如果申请字节数超过了128页,那PageCache也无能为力,就得去系统进行申请了
// 获取一个K页的span
span* PageCache::NewSpan(size_t k)
{//assert(k > 0 && k < MaxPage);if (k > MaxPage - 1){//直接去系统申请内存void* ptr=ApplicateSpace(k);span* Span = new span;Span->Page_Adder = Page_t(ptr) >> PageShift;Span->_n = k;_PageToSpan[Span->Page_Adder] = Span;return Span;}//...
}
以为这样就行了? 还差最后一步:既然你是从系统中申请的,那释放时是不是要从系统中释放!那释放时就要根据地址找Span,所以在系统里申请内存时也要把Span加入到哈希表中来
//获取K页的Span
span* PageCache::NewSpan(size_t k)
{//assert(k > 0 && k < MaxPage);if (k > MaxPage - 1){//直接去系统申请内存void* ptr=ApplicateSpace(k);//span* Span = new span;span* Span = _pool.New();Span->Page_Adder = Page_t(ptr) >> PageShift;Span->_n = k;//要进行记录(后面回收时能找到对应的span)_PageToSpan[Span->Page_Adder] = Span;return Span;}//...
}static inline void ConCurrentFree(void* ptr, size_t size)
{assert(ptr!=nullptr);//assert(size <= MaxBytes);span* Span = PageCache::GetInstance()->MapObjectToSpan(ptr);if (size > MaxBytes){//找系统释放PageCache::GetInstance()->GetMutex().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(Span);PageCache::GetInstance()->GetMutex().unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
完成后要测试一遍,正常退出就问题不大
void TestMoreBytes()
{//找page cache申请void* p1 = ConCurrentAlloc(257 * 1024); //257KBConCurrentFree(p1, 257 * 1024);//找堆申请void* p2 = ConCurrentAlloc(129 * 8 * 1024); //129页ConCurrentFree(p2, 129 * 8 * 1024);}
使用定长内存池脱离new
在 ConCurrentAlloc 使用new出ThreadCache;在PageCache中经常要使用new出Span,在创建带头双向链表时使用new出头节点;本质上还是在要使用malloc申请空间,为了完全摆脱malloc:在这里我们之前的定长内存池就能来替换malloc,使得整个项目完全脱离malloc的控制
class PageCache
{
public://...
private://...ObjectPool<span> _pool;
}//把所有new span的地方统统进行修改
//申请span对象
Span* span = _Pool.New();
//释放span对象
_Pool.Delete(span);
class SpanList
{
public:SpanList(){//_head = new span;_head = _pool.New();_head->_next = _head;_head->_prev = _head;}//...
private://...static ObjectPool<span> _pool;//记得在.cpp文件定义静态成员
};static void* ConCurrentAlloc(size_t size)
{//...else{//assert(size <= MaxBytes);if (pTLSThreadCache == nullptr){static ObjectPool<ThreadCache> TLPool;//pTLSThreadCache = new ThreadCache;pTLSThreadCache = TLPool.New();}//...}
}
释放内存时优化为不传内存大小
使用free是否内存就只用传地址,但我们实现出来的释放函数ConCurrentFree(ptr,size)
需要手动把申请字节数进行传参,非常不方便,万一把它写错了后果不堪设想;所以我们优化成不传对象大小;那具体要怎么做呢?
在span结构体中定义一个成员:_FreeKnowSize;代表span管理的每个内存的大小;在什么地方进行设置呢? 共有两处:申请内存太大去系统申请;CentralCache去PageCache申请内存;两者本质上都是调用 NewSpan() 后返回一个span对象的,同样在调用完成NewSpan()后进行设置
static void* ConCurrentAlloc(size_t size)
{if (size > MaxBytes){//直接去PageCache里申请size_t align = SizeClass::RoundUp(size);//与页数对齐size_t page = align >> PageShift;PageCache::GetInstance()->GetMutex().lock();span* Span = PageCache::GetInstance()->NewSpan(page);Span->_FreeKnowSize = align;//注意设置的是对齐数大小PageCache::GetInstance()->GetMutex().unlock();return (void*)(Span->Page_Adder << PageShift);}//...
}span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//...PageCache::GetInstance()->GetMutex().lock();span* Span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));Span->_FreeKnowSize = size;//释放时知道对齐数//...
}
把 ConcurrentFree接口进行修改
static void ConCurrentFree(void* ptr)
{span* Span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = Span->_FreeKnowSize;//前面一定是设置过的if (size > MaxBytes){PageCache::GetInstance()->GetMutex().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(Span);PageCache::GetInstance()->GetMutex().unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
完成后测试确保没问题
void TestDeAlloc()
{void* p1 = ConCurrentAlloc(8);void* p2 = ConCurrentAlloc(6);void* p3 = ConCurrentAlloc(3);ConCurrentFree(p1);ConCurrentFree(p2);ConCurrentFree(p3);
}
多线程环境下对比malloc测试
之前都是单线程的环境下进行测试,测试没问题后改成多线程环境下进行测试,同时与malloc进行对比
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%zu个线程并发执行%zu轮次,每轮次malloc %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%zu个线程并发执行%zu轮次,每轮次free %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%zu个线程并发malloc&free %zu次,总计花费:%zu ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConCurrentAlloc(16));v.push_back(ConCurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConCurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%zu个线程并发执行%zu轮次,每轮次concurrent alloc %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%zu个线程并发执行%zu轮次,每轮次concurrent dealloc %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%zu个线程并发concurrent alloc&dealloc %zu次,总计花费:%zu ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
- ntime 代表申请与释放的次数
- nworks 代表多少个线程并发
- rounds 代表要执行多少轮(一轮有ntime次申请释放)
测试结果
调试技巧
项目的复杂性在编写过程较容易出现错误,所以这里来介绍几个在本项目中比较常用的调试技巧
条件语句/断点
运行程序时如果程序发生崩溃,那么我们就按 Fn+F5直接定位在出现错误的语句中,在附近展开分析,这里就可以加上条件语句/断点来观察
也可以在断点设置条件就不用自己编写,但相对来说没有那么的方便
查看调用堆栈
找到可疑点后就往上上层看看谁调用它,如何做到?通过调用堆栈
中断调试
有时写出条件语句后打断点进行调试后,没有看到箭头指向断点处,那么我们就要按下全部中断按钮
此时就可以看到箭头了,原来是出现了调试发送死循环了
性能瓶颈分析
多线程环境下对比malloc测试发现我们实现出来的内存池比malloc慢,这时怎么回事呢?我们接下来就来使用VS的性能工具来进行分析
首次使用需要先来进行设置
生成测试报告后,我们发现:在最消耗时间的函数中:都存在调用 MapObjectToSpan 的情况且消耗时间中排在第一位;每次通过地址(页号)找Span时都要加锁来保证线程安全,保障的同时又减低了效率,所以我们要对它来进行优化
基数树优化
基数树是一种数据结构,但本质上是分层哈希,有单层基数树,双层基数树,三层基数树...
单层基数树
采用直接地址法的方式,比较简单
下标对应的是页号,里面存是Span*指针类型 转成void*方便管理
// 单层基数树
template <int BITS>
class TCMalloc_PageMap1
{
private:static const int LENGTH = 1 << BITS; //开 2^32 / 2^13 个位置void** array_; //指针数组public:typedef uintptr_t Number;TCMalloc_PageMap1(){//直接把空间开好size_t size = sizeof(void*) << BITS;size_t bytes = SizeClass::RoundUp(size);array_ = (void**)SystemAlloc(bytes>>PageShift);memset(array_, 0, sizeof(void*) << BITS);}void* get(Number k) const{if ((k >> BITS) > 0){return NULL;}return array_[k];}void set(Number k, void* v) {array_[k] = v;}
};
BITS模版参数代表需要用多少个bit储存页号,假设1页为2^13 bit,在32位下就只需要: 2^32 / = 2^19,只需要19个bit就能够储存页号了;
需要多少空间呢? 需要开辟:2^32 / 2^13 = 2^19个空间,每个空间保存的指针大小为4字节,所以需要 2^19 * 4 = 2^21 =2M的内存开销,问题不大,但在64位下计算结果为;2^24G,这明显是不可能的,所以我们找三层基数树来解决64位下的储存空间
双层基数树
储存过程:(32位下)用前5个比特位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的比特位在基数树的第二层进行映射,映射后最终得到该页号对应的span指针
第一层有 2^5 个空间,总共占用了:2^5 * 2^13 * 4 = 2^ 21 = 2M空间,与单层所占空间基本是一样的,所以单层与双层都只适用于32位环境下
// 双层基数树
template <int BITS>
class TCMalloc_PageMap2 {
private:static const int ROOT_BITS = 5; //通过页号的前5个bit表示位置static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层开多少个元素static const int LEAF_BITS = BITS - ROOT_BITS; //通过页号的前6~14个bit表示位置static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层开多少个元素 //第二层struct Leaf {void* values[LEAF_LENGTH];};//第一层Leaf* root_[ROOT_LENGTH]; public:typedef uintptr_t Number;explicit TCMalloc_PageMap2() {memset(root_, 0, sizeof(root_));//直接开第二层的空间Ensure(0, 1 << BITS);}bool Ensure(Number start, size_t n){for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// 下标if (root_[i1] == NULL) {static ObjectPool<Leaf> LeafPool;Leaf* leaf = LeafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);assert(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}
};
三层基数树
页号的三次映射
// 处理64位情况
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3
{
private:static const int INTERIOR_BITS = (BITS + 2) / 3; // 第一层和第二层的页号的比特位个数static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; // 第一层和第二层开多少元素static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; // 第三层的页号的比特位个数static const int LEAF_LENGTH = 1 << LEAF_BITS; // 第三层开多少元素//第一,二层结构struct Node {Node* ptrs[INTERIOR_LENGTH];};//第三层结构struct Leaf {void* values[LEAF_LENGTH];};//指针实现Node* root_;Node* NewNode() {static ObjectPool<Node> ObjectNode;Node* result = ObjectNode.New();if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}
public:typedef uintptr_t Number;//初始化先开辟第一层空间explicit TCMalloc_PageMap3() {root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一层下标const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层小标const Number i3 = k & (LEAF_LENGTH - 1); //第三层下标//页号超出范围,或映射该页号的空间未开辟if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL){return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {assert(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);//没空间去开辟一组映射出来Ensure(k, 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH){return false;}if (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}if (root_->ptrs[i1]->ptrs[i2] == NULL) {static ObjectPool<Leaf> ObjectLeaf;Leaf* leaf = ObjectLeaf.New();memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}
};
接着把红黑树进行替换,再把所有的插入与查找的接口换成自己实现好的函数
//单例模式
class PageCache
{//...//std::unordered_map<Page_t, span*> _PageToSpan;//建立页号与span*的映射
#ifdef _WIN64TCMalloc_PageMap3<64 - PageShift> _PageToSpan;
#elif _WIN32TCMalloc_PageMap1<32 - PageShift> _PageToSpan;
#endif//...};span* PageCache::MapObjectToSpan(void* obj)
{//锁没了效率嘎嘎快span* FindSpan = (span*)_PageToSpan.get((Page_t)obj >> PageShift);assert(FindSpan != nullptr);return FindSpan;
}//...
从上面可以看到替换成基数树我是没有加锁的:因为基数树在插入之前一定是有空间开辟好了的,查找时的位置不会随着其它线程进行插入而改变位置;而且线程去查找之前里面一定是记录了相关信息,查找时一定能够找到的;所以优点就在于不用加速也没有线程安全
修改完成测试一遍
在64位下测试一遍,另外把申请固定内存大小换成随机内存大小
项目源码
gitee链接:ConcurrentMemoryPool
以上文章有如何问题欢迎在评论区讨论,感谢观看^_^
相关文章:
项目——高并发内存池
目录 项目介绍 做的是什么 要求 内存池介绍 池化技术 内存池 解决的问题 设计定长内存池 高并发内存池整体框架设计 ThreadCache ThreadCache整体设计 哈希桶映射对齐规则 ThreadCache TLS无锁访问 CentralCache CentralCache整体设计 CentralCache结构设计 C…...
几种查看PyTorch、cuda 和 Python 版本方法
在检查 PyTorch、cuda 和 Python 版本时,除了直接使用 torch.__version__ 和 sys.version,我们还可以通过其他方式实现相同的功能 方法 1:直接访问属性(原始代码) import torch import sysprint("PyTorch Versi…...
如何实现跟踪+分割的高效协同?SiamMask中的多任务损失设计
如何实现跟踪分割的高效协同?SiamMask中的多任务损失设计 一、引言二、三大分支损失函数详解2.1 分类分支损失2.2 回归分支损失2.3 Mask分支损失 三、损失加权策略与系数选择3.1 常见超参数设定3.2 动态权重(可选) 四、训练实践:平…...
MODBUS转EtherNetIP边缘计算网关配置优化:Logix5000与ATV340高效数据同步与抗干扰方案
一、行业背景 智能制造是当前工业发展的趋势,智能工厂通过集成各种自动化设备和信息技术,实现生产过程的智能化、自动化和高效化。在某智能工厂中,存在大量采用ModbusTCP协议的设备,如智能传感器、变频器等,而工厂的主…...
从代码学习深度学习 - 图像增广 PyTorch 版
文章目录 前言一、图像增广的基本概念二、PyTorch中的图像增广实现三、数据加载与处理四、模型训练与评估五、实验设置与执行六、实验结果与分析七、讨论总结前言 在深度学习中,数据是关键。尤其是在计算机视觉任务中,高质量且丰富多样的数据对模型性能有着决定性的影响。然…...
从机械应答到智能对话:大模型为呼叫注入智慧新动能
引言 在当今竞争激烈的商业环境中,高效和有效的客户沟通对于企业的成功至关重要。智能外呼系统已成为企业与潜在客户和现有客户互动的重要工具。最近,大模型(如大型语言模型或 LLMs)的出现为这些系统带来了显著的提升,…...
深入浅出 Python 协程:从异步基础到开发测试工具的实践指南
Python 的异步编程近年来越来越受欢迎,尤其在需要同时处理大量 I/O 请求的场景中,它展现了出色的性能。而协程是异步编程的核心,也是开发高效异步测试工具的关键技术。 这篇文章将用通俗的语言带你快速入门 Python 协程,结合实际…...
算法之分支定界
分支定界 分支定界概述核心思想与步骤常见变体复杂度分析案例分析1. 0-1背包问题2. 最短路径问题(分支定界法)3. 旅行商问题(TSP) 分支定界 概述 分支定界(Branch and Bound)是一种用于解决组合优化问题的…...
Hugging Face上面找开源的embedding模型
问题 想找一个支持中文的embedding模型(把一段文本转化成多维度的向量)。Hugging Face平台上面共享了很多开源模型,算是这年头(2025年),大家都把自己开源模式都往上放的地方了吧。现在去这个平台上面找一个…...
docker部署Jenkins工具
环境准备 1.当前安装在Windows系统下的Docker-Desktop 下载地址:Docker Desktop: The #1 Containerization Tool for Developers | Docker 2.下载后进行安装并进行配置启动docker 3.创建一个空的文件夹,用于后面的启动时做文件路径映射 下载镜像 d…...
Pgvector+R2R搭建RAG知识库
背景 R2R是一个采用Python编写的开源AI RAG框架项目,与PostgreSQL技术栈集成度高,运行需求资源少(主要是本人的Macbook air m1内存只有8G)的特点,对部署本地私有化化AI RAG应用友好。 Resource Recommendations Whe…...
Qt本地化 - installTranslator不生效
bool QCoreApplication::installTranslator(QTranslator *translationFile)注意这里输入的是QTranslator对象指针,如果QTranslator是局部变量,一旦离开其作用域就会导致翻译失效 错误代码示范: void ApplyTranslator(const QString& qmf…...
精益数据分析(19/126):走出数据误区,拥抱创业愿景
精益数据分析(19/126):走出数据误区,拥抱创业愿景 在创业与数据分析的探索之旅中,我们都渴望获取更多知识,少走弯路。今天,我依然带着和大家共同进步的想法,深入解读《精益数据分析…...
六、初始化与清理(Initialization cleanup)
六、初始化与清理(Initialization & cleanup) 本章内容主要介绍C中的 构造函数 和 析构函数 的作用与用法,以及默认构造、聚合初始化等相关特性 封装 和 *访问控制 *在提升库使用的便捷性方面迈出了重要的一步。在安全性方面࿰…...
Python - 爬虫-网页解析数据-库lxml(支持XPath)
lxml是 Python 的第三方解析库,完全使用 Python 语言编写,它对 Xpath 表达式提供了良好的支持,支持HTML和XML的解析,支持XPath解析方式,而且解析效率非常高 XPath,全称XML Path Language,即XML…...
单片机 + 图像处理芯片 + TFT彩屏 触摸滑动条控件
触摸滑动条控件使用说明 一、项目概述 本项目基于单片机和RA8889/RA6809图形处理芯片的TFT触摸屏滑动条控件。该控件支持水平和垂直滑动条,可自定义外观和行为,并支持回调函数进行值变化通知。 硬件平台:51/ARM均可(测试时使用STC8H8K64U单…...
LeetCode每日一题4.24
2799. 统计完全子数组的数目 题目 问题分析 完全子数组 的定义:子数组中不同元素的数目等于整个数组不同元素的数目。 子数组 是数组中的一个连续非空序列。 思路 统计整个数组的不同元素数目: 使用 set 来获取整个数组的不同元素数目。 遍历所有子数…...
LeetCode238_除自身以外数组的乘积
LeetCode238_除自身以外数组的乘积 标签:#数组 #前缀和Ⅰ. 题目Ⅱ. 示例0. 个人方法一:暴力循环嵌套0. 个人方法二:前缀和后缀分别求积 标签:#数组 #前缀和 Ⅰ. 题目 给你一个整数数组 nums,返回 数组 answer &#…...
基于 Spring Boot 的银行柜台管理系统设计与实现(源码+文档+部署讲解)
技术范围:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…...
LeetCode 2799.统计完全子数组的数目:滑动窗口(哈希表)
【LetMeFly】2799.统计完全子数组的数目:滑动窗口(哈希表) 力扣题目链接:https://leetcode.cn/problems/count-complete-subarrays-in-an-array/ 给你一个由 正 整数组成的数组 nums 。 如果数组中的某个子数组满足下述条件&am…...
卡尔曼滤波解释及示例
卡尔曼滤波的本质是用数学方法平衡预测与观测的可信度 ,通过不断迭代逼近真实状态。其高效性和鲁棒性,通常在导航定位中,需要融合GPS、加速度计、陀螺仪、激光雷达或摄像头数据,来提高位置精度。简单讲,卡尔曼滤波就是…...
在vue项目中实现svn日志打印
在vue项目中实现svn日志打印 实现svnlog创建svn-log脚本 convert-svn-log.js配置命令 package 实现svnlog 项目工程 类似于git的conventional-changelog 创建svn-log脚本 convert-svn-log.js 在项目根目录创建convert-svn-log.js const fs require(fs-extra); const xml2j…...
使用vue2开发一个医疗预约挂号平台-前端静态网站项目练习
对于后端开发的我,最近一直在学习前端开发,除了要学习一些前端的基础知识外,肯定少不了一些前端项目练习,就通过前端的编程知识 就简单做一个医疗预约挂号前端静态页面。这个网站主要是使用了vue2 的相关技术实现的。 主要实现了这…...
Redis的过期删除策略和内存淘汰策略
🤔 过期删除和内存淘汰乍一看很像,都是做删除操作的,这么分有什么意思? 首先,设置过期时间我们很熟悉,过期时间到了,我么的键就会被删除掉,这就是我们常认识的过期删除,…...
Langchain检索YouTube字幕
创建一个简单搜索引擎,将用户原始问题传递该搜索系统 本文重点:获取保存文档——保存向量数据库——加载向量数据库 专注于youtube的字幕,利用youtube的公开接口,获取元数据 pip install youtube-transscript-api pytube 初始化 …...
服务器上安装node
1.安装 下载安装包 https://nodejs.org/en/download 解压安装包 将安装包上传到/opt/software目录下 cd /opt/software tar -xzvf node-v16.14.2-linux-x64.tar.gz 将解压的文件夹移动到安装目录(/opt/nodejs)下 mv /opt/software/node-v16.14.2-linux-x64 /opt/nodejs …...
React:什么是Hook?通俗易懂的讲讲
什么是Hook 1.Hook 是什么?2.React 内置的 Hook3. 自定义 Hook4. 总结 1.Hook 是什么? 可以理解为:函数组件的工具/功能插件 Hook是 React 16.8 以后提供的一种新特性, 让你在函数组件里“钩入”React 的功能(比如状态…...
树莓派安装GStreamer ,opencv支持, 并在虚拟环境中使用的安装方法
首先是我在树莓派中 使用OpenCV 读取网络视频流, 如海康威视 通过rtsp协议地址读取 会发生延迟和丢包的情况 后来使用ffmpeg和OpenCV 读取视频流 丢报的问题减少了 但是长时间运行 还是会造成延迟和卡顿 最后直接卡死画面 后来试了一下GStreamer 管道流 是树莓派支持的 但是原生…...
从节点重排看React 与 Vue3 的 Diff 算法
一个有趣的问题 之前我写了一篇狗教我 React——原理篇之 Diff 算法 - 掘金 (juejin.cn)简单介绍了 diff 算法,收到了一个有意思的疑问: 大佬讲得非常易懂,我有个疑惑就是都说 diff 处理节点前移比较差,比如 a→b→c→d 更新为 d→a→b→c,如果第一遍循环到第一个就截止了…...
【FAQ】PCoIP 会话后物理工作站本地显示器黑屏
# 问题 工作人员从家里建立了到办公室工作站的 PCoIP 连接,该工作站安装了 HP Anyware Graphics Agent,并且还连接了本地显示器。然后,远程用户决定去办公室进行本地工作,工作站显示器显示黑屏(有时没有信号ÿ…...
springboot基于hadoop的酷狗音乐爬虫大数据分析可视化系统(源码+lw+部署文档+讲解),源码可白嫖!
摘要 本酷狗音乐爬虫大数据分析可视化系统采用B/S架构,数据库是MySQL,网站的搭建与开发采用了先进的Java语言、Hadoop、爬虫技术进行编写,使用了Spring Boot框架。该系统从两个对象:由管理员和用户来对系统进行设计构建。前台主要…...
基于大模型的食管平滑肌瘤全周期预测与诊疗方案研究
目录 一、引言 1.1 研究背景与意义 1.2 研究目的 1.3 国内外研究现状 二、大模型技术原理与应用概述 2.1 大模型介绍 2.2 在医疗领域的应用现状 2.3 用于食管平滑肌瘤预测的可行性分析 三、食管平滑肌瘤术前预测 3.1 预测指标选取 3.2 数据收集与预处理 3.2.1 数据…...
26考研 | 王道 | 数据结构 | 第七章 查找
第七章 查找 文章目录 第七章 查找7.1 查找概念7.2 顺序查找7.3 折半查找7.4 分块查找7.5 二叉排序树7.6 平衡二叉树平衡二叉树的插入平衡二叉树的删除 7.7 红黑树7.7.1 为什么要发明红黑树?7.7.2 红黑树的定义和性质7.7.3 红黑树的插入和删除插入删除 7.8 B树和B树…...
Docker 部署 Redis:快速搭建高效缓存服务
Docker 部署 Redis:快速搭建高效缓存服务 引言 Redis 是一个高性能的键值数据库,广泛应用于缓存、消息队列、实时分析等领域。而 Docker 作为容器化技术的代表,能够帮助我们快速部署和管理应用程序。结合两者,我们可以轻松实现 …...
【缓存与数据库结合最终方案】伪从技术
实现伪从技术:基于Binlog的Following表变更监听与缓存更新 技术方案概述 要实现一个专门消费者服务作为Following表的伪从,订阅binlog并在数据变更时更新缓存,可以采用以下技术方案: 主要组件 MySQL Binlog监听:使…...
如何规避矩阵运营中的限流风险及解决方案
在自媒体矩阵化运营中,系统性规避平台限流机制需建立在精准理解算法逻辑的基础上。根据行业实践数据统计,当前矩阵账号触发限流的核心诱因主要集中在两大维度: 首先需要明确的是设备与网络层面的合规性配置。当单台移动设备频繁切换多账号登…...
TensorFlow Keras“安全模式”真的安全吗?绕过 safe_mode 缓解措施,实现任意代码执行
机器学习框架通常依赖序列化和反序列化机制来存储和加载模型,然而模型中不恰当的代码隔离和可执行组件可能会导致严重的安全风险。 TensorFlow 中的 Keras v3 ML 模型结构 对于基于 TensorFlow 的 Keras 模型,存在一个严重的反序列化漏洞,编号为CVE-2024-3660。攻击者可利…...
PostgreSQL-日志管理介绍
概述 1、日志管理器: 日志模块包括事务提交日志CLOG和数据日志XLOG。其中CLOG是系统为整个事务管理流程所建立的日志,主要用于记录事务的状态,同时通过SUBTRANS日志记录事务的嵌套关系。XLOG日志是数据库日志的主体,记录数据库中…...
【Java 数据结构】泛型
目录 一. 什么是泛型 二. 引出泛型 三. 泛型语法 四. 泛型的使用 五. 泛型是如何编译的 5.1 擦除机制 六. 泛型的继承 6.1 泛型类继承非泛型类 6.2 泛型类继承泛型类 6.2.1 父类的同名传递 6.2 2 父类的异名传递 6.2.3 父类固定类型传递 6.2.4 子类添加参数 七. 泛…...
鲲鹏麒麟搭建Docker仓库
Docker Registry简介 Docker Registry是一个开源的镜像仓库工具,用于存储和分发Docker镜像。它是Docker生态系统中的核心组件之一,提供了镜像的推送(push)、拉取(pull)和管理功能。 主要特性: 1、开源免费:Apache 2.0许可证 2、轻…...
Java快速上手之实验4(接口回调)
1.编写接口程序RunTest.java,通过接口回调实现多态性。解释【代码4】和【代码6】的执行结果为何不同? interface Runable{ void run(); } class Cat implements Runable{ public void run(){ System.out.println("猫急上树.."…...
【前端】【业务场景】【面试】在前端开发中,如何实现实时数据更新,比如实时显示服务器推送的消息,并且保证在不同网络环境下的稳定性和性能?
问题:在前端开发中,如何实现实时数据更新,比如实时显示服务器推送的消息,并且保证在不同网络环境下的稳定性和性能? 一、实现实时数据更新的方法 WebSocket: 原理:WebSocket 是一种在单个 TCP …...
redis相关问题整理
Redis 支持多种数据类型: 字符串 示例:存储用户信息 // 假设我们使用 redis-plus-plus 客户端库 auto redis Redis("tcp://127.0.0.1:6379"); redis.set("user:1000", "{name: John Doe, email: john.doeexample.com}"…...
某城乡老旧房屋试点自动化监测服务项目
1. 项目简介 我国是房屋建设增长量最高的国家或地区,但上个世纪末建造的房屋多为砖混结构,使用寿命短且缺乏维护。这些房屋在使用过程中受到地质活动、自然环境和人为改造的影响,其结构强度逐年下降,部分房屋甚至出现墙体裂缝、倾…...
企业为何要求禁用缺省口令?安全风险及应对措施分析
在当今数字化时代,企业网络安全面临着前所未有的挑战。缺省口令的使用是网络安全中的一个重要隐患,许多企业在制定网络安全红线时,明确要求禁用缺省口令。本文将探讨这一要求的原因及其对企业安全的重要性。 引言:一个真实的入侵场…...
在 MySQL 中,索引前缀长度为什么选择为 191
在 MySQL 中,索引前缀长度选择为 191 的常见原因主要与 字符集编码 和 索引长度限制 相关,具体解释如下: 1. 字符集编码的影响 utf8mb4 字符集: MySQL 的 utf8mb4 字符集每个字符最多占用 4 个字节(相比 utf8 的 3 字…...
【Python语言基础】24、并发编程
文章目录 1. 多线程(threading模块)1.1 多线程的实现(threading 模块)1.2 多线程的优缺点1.3 线程同步与锁 2. 多进程(multiprocessing模块)2.1 多进程实现(multiprocessing模块)2.2 多进程的优缺点2.3 进程…...
MySQL-自定义函数
自定义函数 函数的作用 mysql数据库中已经提供了内置的函数,比如:sum,avg,concat等等,方便我们日常的使用,当需要时mysql支持定义自定义的函数,方便与我们对于需用复用的功能进行封装。 基本…...
实时操作系统在服务型机器人中的关键作用
一、服务型机器人的发展现状与需求 近年来,服务型机器人市场呈现出蓬勃发展的态势。据国际机器人联合会(IFR)2024 年度报告显示,全球人形机器人市场规模预计在 2025 年达到 38.7 亿美元,年复合增长率达 19.2%。服务型机…...
智能电网第5期 | 老旧电力设备智能化改造:协议转换与边缘计算
随着电力行业数字化转型加速,大量在役老旧设备面临智能化升级需求。在配电自动化改造过程中,企业面临三大核心挑战: 协议兼容难题:传统设备采用Modbus等老旧协议,无法接入智能电网系统 数据处理瓶颈:设备本…...