【项目】构建高性能多线程内存池:简化版 tcmalloc 实现指南
00 引言
在高并发应用中,频繁的小块内存申请与释放不仅会带来性能瓶颈,还容易导致内存碎片问题。为此,内存池技术应运而生,而 tcmalloc
(Thread-Caching Malloc)作为 Google 开源的高性能内存分配器,是学习与借鉴的优秀模板。本文将以简化版 tcmalloc
为目标,从零手把手带你构建一个支持多线程的高性能内存池。
本项目将涉及到C/C++、数据结构(如链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等技术。如果你对这些概念还不熟悉,建议先学习这些相关知识,确保理解这些基础知识后再深入本篇内容。
项目源代码地址:https://github.com/Kutbas/ConcurrentMemory
01 内存池技术概览
池化技术是一种资源管理策略,它的基本思想是程序预先申请超出当前需求的资源,然后自主管理这些资源,以备未来使用。
那么,什么是“超出需求的资源”呢?简单来说,就是那些当前不立即使用,但可以储备以备将来使用的资源。
让我们通过一个日常生活的例子来理解这个概念。
想象你住在宿舍楼,楼道里有饮水机。假如你只有一个杯子,每次想喝水都得跑到饮水机那里接水,这样
既麻烦又浪费时间。如果为了提高效率,你决定买一个水壶,每次接水时带上水壶,这样可以一次性装满
水,回到宿舍后直接从水壶里倒水喝。当水喝完后,再去饮水机接水。显然,这种方式比每次都去接水要
高效得多。这其实就是一种池化技术。
在这个例子中,提前准备好水壶,就像是向系统申请了过量的资源,接水时直接从已申请的水壶中取水,而不是每次都去申请新的资源。
在技术领域中,内存池的工作原理也类似。程序首先向系统申请一块较大的内存空间,而不是每次需要内存时都向操作系统请求。之后,程序会在这块已申请的内存中进行分配和回收,而不再频繁地请求系统资源。
除了内存池,常见的池化技术还包括:
- 连接池:例如数据库连接池。每次与数据库建立连接都是一个耗时的操作,如果每次仅执行一个简单的SQL语句后就关闭连接,会浪费大量时间。因此,连接池通常会预先创建多个连接,执行SQL时从池中获取一个空闲连接,执行完后不关闭连接,继续等待下一次使用。这样可以避免频繁的连接和断开操作。
- 线程池:线程池的思想是预先创建一定数量的线程,并使它们处于休眠状态。当有客户端请求时,唤醒一个空闲线程来处理该请求。处理完毕后,线程进入休眠状态,等待下一个请求。
池化技术的核心思想在于提前申请并重复使用资源,从而减少重复创建资源的开销,提升效率。不同类型的池在实现细节上可能有所不同,但其根本思想是相似的。
02 项目整体架构
我们借鉴 tcmalloc
的三层架构,设计如下模块:
● Thread Cache
每个线程拥有独立的线程缓存,避免加锁。只要申请的内存不超过 256KB,线程会优先从自身的 Thread Cache
获取空间,其内部使用哈希桶+自由链表存储不同大小的内存块。
● Central Cache
线程缓存不够时,从 Central Cache
请求空间。
● Page Cache
当 Central Cache
空间不足或释放空间过多时,向 Page Cache
请求或归还内存。Page Cache
以页为单位组织内存,负责整合碎片,进行 Span
的合并与拆分。
03 定长内存池:快速入门
在学习高并发内存池之前,我们先来热个身,手动实现一个定长内存池,帮助大家建立对“内存池”的基本认识。
我们知道 malloc
虽然万能,但为了应对各种情况,设计得相对复杂,在高频率、小对象申请释放的场景下,效率可能不理想。那么,如果某个场景中我们只需要固定大小的内存分配,还有必要使用复杂的通用方案吗?其实完全可以设计一个更简洁高效的内存池,专门处理这种定长需求,这也正是我们现在要做的。
这个定长内存池的实现非常轻量,不到 100 行代码就能搞定,具体实现可以总结为以下几点:
- 预先申请一大块内存,把它切成一小块一小块的用;
- 用自由链表把这些小块管理起来,实现空间的重复利用;
- 如果用完了,再申请一块新的接着用;
- 每次回收时,就把小块重新挂到自由链表上。
话不多说,代码奉上:
template<class T>
class ObjectPool
{public:T* New(){T* obj = nullptr;// 优先使用还回来的内存if (_freeList){void* next = *((void**)_freeList);obj = (T*)_freeList;_freeList = next;}else{// 如果剩余内存不够一个对象大小时,则重新开大块空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//_memory = (char*)malloc(_remainBytes);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr)throw std::bad_alloc();}obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}// 定位new,显示调用T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){// 显示调用析构函数清理对象obj->~T();// 头插,为空与否的插入方式一致*(void**)obj = _freeList;_freeList = obj;}private:char* _memory = nullptr; // 指向大块内存的指针size_t _remainBytes = 0; // 大块内存在切分过程中剩余的字节数void* _freeList = nullptr; // 还回来过程中链接的自由链表};
这样,一个简单而高效的定长内存池就实现了。下面我们写段代码来测试一下:
struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{const size_t Rounds = 3;// 申请释放的轮次const size_t N = 1000000;// 每轮申请释放多少次size_t begin1 = clock();std::vector<TreeNode*> v1;v1.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();ObjectPool<TreeNode> TNPool;size_t begin2 = clock();std::vector<TreeNode*> v2;v2.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < 100000; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}
可以看到,在多轮申请和释放下,使用定长内存池的速度明显优于 new/delete
机制。
到这里,一个简单的定长内存池就算搞定了,下面我们正式进入高并发内存池的实现。
04 Thread Cache 初步实现
我们先简单回顾下之前说的定长内存池:
在定长内存池中,有一个 _freelist
,也就是一个自由链表,用来管理某种固定大小的小块内存。比如我们要频繁分配一个对象,那这个 _freelist
就可以专门为这个对象提供固定长度的内存块。
但线程申请内存时,需求就没这么“整齐”了——可能要3B、6B、18K、236K……这时候,当然不能只用一个 _freelist
,需要为不同大小的内存块设置对应的 _freelist
来做管理。
不过也不能太极端。比如说最大申请256KB,如果每个字节都建一个 _freelist
,那就是二十多万个,没必要也没意义。
所以我们要在空间粒度和管理开销之间做个平衡:比如,第一个 _freelist
管8B的块,第二个管16B的,再往下是24B、32B……直到256KB。虽然听上去像是每次增加8B,其实真实实现里会有更复杂的对齐策略,这里先不展开。
那内存怎么分配呢?比如线程申请5B的内存,我们就给它一个8B的块;要10B,就给16B。虽然可能“多给了一点”,但这比“给不够”要好得多。可以把它想象成你找爸妈要300块买衣服,结果爸妈豪气地给了你500,开心!但如果只给你50,让你去地摊凑合,那体验可就不太好了。
这种“多给”就带来了内碎片。就是说实际用的空间没那么大,但因为对齐分配,浪费了一点。而之前讲的外碎片,是指空间虽然多但不连续,申请不到大块。
线程要申请内存时,其实是向 ThreadCache
请求。ThreadCache
内部本质上是一个哈希桶结构,每个桶就是一个 _freelist
,比如第0个桶管8B、第1个管16B……到最大256KB。找内存时,就看对应大小的桶里有没有空闲块,有就用;没有就从更高一层的 CentralCache
拿。
由于要完整实现 ThreadCache
需要涉及到和 CentralCache
的交互,不好各自独立实现,所以我们先对 ThreadCache
进行初步实现。
04.1 基础结构设计
class ThreadCache
{public:// 申请和释放内存对象void *Allocate(size_t size);void Deallocate(void *ptr, size_t size);// 从中心缓存获取对象void *FetchFromCentralCache(size_t index, size_t size);// 自由链表过长时,回收内存至 central cachevoid ListTooLong(FreeList &list, size_t size);private:FreeList _freeLists[NFREELIST];
};// TLS thread local storage
static _declspec(thread) ThreadCache *pTLSThreadCache = nullptr;
在 ThreadCache
类中,FreeList
类就是我们管理空闲内存块的核心结构,正如前面所说的,它虽然叫FreeList
,但实际上是一个哈希桶,桶内不需要额外的链表节点,内存块自己就是节点。它的结构如下:
class FreeList
{public:void Push(void *obj);void PushRange(void *start, void *end, size_t n);void PopRange(void *&start, void *&end, size_t n);void *Pop();bool Empty();size_t &MaxSize();size_t Size();private:void *_freeList = nullptr;size_t _maxSize = 1;size_t _size = 0;
};
目前我们只需要知道的是,FreeList
类提供两个操作:Push
把块放回去,Pop
从链表取出一块。
void Push(void *obj)
{// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;++_size;
}void PushRange(void *start, void *end, size_t n)
{NextObj(end) = _freeList;_freeList = start;_size += n;
}
为了让代码更清晰,我们封装了 ObjNext()
方法来获取或设置某个内存块的“下一个指针”位置,用 static
修饰,避免多次包含时报错。
static void *&NextObj(void *obj)
{return *(void **)obj;
}
04.2 哈希桶设计与对齐规则
我们知道哈希表每个桶对应一个 _freelist
,关键是:桶要设多少个?我们不能每8B就搞一个桶,还是太多。
于是我们参考 tcmalloc
的做法,按申请大小分多个区间:
- [1, 128] 对齐到8B
- [129, 1024] 对齐到16B
- [1025, 8K] 对齐到128B
- [8K+1, 64K] 对齐到1024B
- [64K+1, 256K] 对齐到8KB
这样设计下来,最多只需要 208个桶,空间浪费也被控制在10%左右。浪费最多的点通常是每个区间的起始,比如 1B、129B、1025B,这些对齐后会有一定冗余,但相对于分配效率,这点浪费可以接受。
基于这样的设计,ThreadCache
的逻辑结构就如下图所示:
04.3 SizeClass:对齐与桶映射工具类
为了计算每次申请要对齐到多少字节、落在哪个桶里,我们专门写一个 SizeClass
类,内部的方法如下所示:
class SizeClass
{public:static inline size_t _RoundUp(size_t bytes, size_t align){/*size_t alignSize;if (bytes % align != 0)alignSize = (bytes / align + 1) * align;elsealignSize = bytes;return alignSize;*/return ((bytes + align - 1) & ~(align - 1));}static inline size_t RoundUp(size_t size){if (size <= 128)return _RoundUp(size, 8);else if (size <= 1024)return _RoundUp(size, 16);else if (size <= 8 * 1024)return _RoundUp(size, 128);else if (size <= 64 * 1024)return _RoundUp(size, 1024);else if (size <= 256 * 1024)return _RoundUp(size, 8 * 1024);elsereturn _RoundUp(size, 1 << PAGE_SHIFT);}static inline size_t _Index(size_t bytes, size_t align_shift){/*if (bytes % align == 0)return bytes / align - 1;elsereturn bytes / align;*/return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪⼀个⾃由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = {16, 56, 56, 56};if (bytes <= 128)return _Index(bytes, 3);else if (bytes <= 1024)return _Index(bytes - 128, 4) + group_array[0];else if (bytes <= 8 * 1024)return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];else if (bytes <= 64 * 1024)return _Index(bytes - 8 * 1024, 10) + group_array[2] +group_array[1] + group_array[0];else if (bytes <= 256 * 1024)return _Index(bytes - 64 * 1024, 13) + group_array[3] +group_array[2] + group_array[1] + group_array[0];else{assert(false);return -1;}}
};
其中有两个供外部调用的静态方法:
RoundUp(size)
:给定原始大小,算出应该对齐到多少字节。Index(size)
:算出该大小对应哈希表中的哪个下标。
这里的 Index
方法会根据 size 所属的区间,结合对齐规则,精准计算它在桶数组中的位置。内部还会用到一个辅助方法 _Index()
,这个方法可以用位运算优化,效率更高。
04.4 ThreadCache::Allocate 的实现
ThreadCache::Allocate
是管理每个 ThreadCache
中内存分配的核心函数。它的主要任务是根据线程请求的内存大小从相应的哈希桶中获取空闲内存,或者在没有足够的空间时向CentralCache
请求新的内存块。在经过前面的铺垫之后下面来拆解它的实现思路。
- 计算对齐后的字节数和哈希桶索引。当线程请求一定大小的内存时,首先需要根据请求的字节数来确定实际分配的内存大小。这一步我们知道是通过
RoundUp(size)
和Index(size)
两个函数来实现。 - 查找哈希桶并分配内存。一旦计算出需要对齐后的字节数和桶索引,
ThreadCache::Allocate
就会尝试从哈希桶中对应的自由链表中获取内存块。这会涉及到两个过程:- 如果哈希桶中有空闲内存块(即该桶对应的自由链表非空),则调用
Pop()
方法从该链表中取出一个内存块返回给线程。 - 如果哈希桶中没有足够的内存块(即链表为空),则需要向
CentralCache
请求内存。这时,ThreadCache
会通过FetchFromCentralCache()
方法从中央缓存申请内存。
- 如果哈希桶中有空闲内存块(即该桶对应的自由链表非空),则调用
- 向中央缓存请求内存。如果线程请求的内存无法从当前的
ThreadCache
获取到,ThreadCache::Allocate
就会将请求转发到CentralCache
。
基于以上实现思路,下面给出ThreadCache::Allocate
的代码:
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty())return _freeLists[index].Pop();elsereturn FetchFromCentralCache(index, alignSize);
}
以上,我们便初步实现了 ThreadCache
。下面我们进入到 CentralCache
的实现。
05 Central Cache 初步实现
现在我们知道,当 ThreadCache
的空间不足时,会向 CentralCache
请求更多的空间。在ThreadCache
的实现中,曾留有一个接口 FetchFromCentralCache
,这个接口的具体实现会在我们讨论完CentralCache
的细节后完成。
05.1 Thread Cache 与 Central Cache 的关系
我们先来看下 CentralCache
的结构:
// 单例模式
class CentralCache
{public:static CentralCache* GetInstance(){return &_sInst;}// 获取⼀个非空的 spanSpan* GetOneSpan(SpanList& list, size_t size);// 从central cache获取一定数量的内存对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 将⼀定数量的对象释放到 Spanvoid ReleaseListToSpans(void* start, size_t size);private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
实际上,ThreadCache
和 CentralCache
有很多相似之处,比较明显的是 CentralCache
也采用了哈希结构来存储数据,并且同样是根据块的大小来进行映射。两者的映射规则一致带来了许多便利,比如当 ThreadCache
对应的哈希桶没有足够的空间时,可以直接向 CentralCache
的相同哈希桶请求空间。这样一来,两个系统中的哈希桶实现一一对应,查找操作也变得更加高效。
ThreadCache
和 CentralCache
最大的区别在于线程隔离和锁的使用。
由于每个线程都有自己的 ThreadCache
,所以线程向自己的 ThreadCache
申请空间时,是不需要加锁的,效率很高。但如果某个桶(比如 24B)已经空了,ThreadCache
就会从全局的 CentralCache
中申请内存。这个时候问题来了:CentralCache
是全局唯一的,并且所有线程共享一个桶(SpanList)。
举个例子,如果线程 t1
和 t2
同时从各自 ThreadCache
的 24B 桶申请内存,而这个桶恰好都空了,它们就会同时向 CentralCache
的 2 号桶发起请求。此时就会发生竞争,因此必须加锁来确保线程安全。
值得注意的是:不是每次访问 CentralCache 都会锁冲突,只有当多个线程竞争同一个桶时才会锁冲突。如果 t1
向 24B 的桶申请,t2
向 128B 的桶申请,它们访问的是 CentralCache
中的不同桶,使用的是不同的锁,不会产生冲突。
05.2 Span 结构与内存管理
除了锁机制的不同,ThreadCache
和 CentralCache
在管理内存方面也有所不同。在 ThreadCache
中,自由链表挂的是一个个小块空间,而在 CentralCache
中,自由链表挂的是 Span
结构体,这是一种管理大块内存的结构体,它以页为单位进行管理,每个 Span
可以包含多个页。其逻辑结构如下图所示:
Span
结构体的实现如下所示:
// 管理多个连续页的大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; // 大块内存的起始页号size_t _n = 0; // 页的数量// 双向链表Span* _next = nullptr;Span* _prev = nullptr;size_t _objSize = 0; // 切好的小对象的大小size_t _useCount = 0; // 切好的小块内存被分配给thread cache的数量void* _freeList = nullptr; // 切好的小块内存的自由链表bool _isUse = false; // 是否被使用
};
在 Span
结构体中,有一个 _n
成员变量,用来记录该 Span
管理了多少页(Page)内存。而具体需要多少页,是由 SizeClass::NumMovePage(size)
决定的,它会根据你当前要分配的小块内存 size
动态计算出合理的页数。其实现方式如下所示:
class SizeClass
{public:// 一次从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);if (size == 0)return 0;// [2, 512],⼀次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限⾼// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}// 计算一次向系统获取几个页// 单个对象 8B// ...// 单个对象 256KBstatic size_t NumMovePage(size_t size){size_t num = NumMoveSize(size);size_t npage = num * size;npage >>= PAGE_SHIFT;if (npage == 0)npage = 1;return npage;}};
整个计算过程是这样的:
- 先算出要搬多少个对象:
使用NumMoveSize(size)
,根据对象的大小,系统估计出一次批量搬运多少个比较合适。这个值是MAX_BYTES / size
,也就是说小对象就多搬点,大对象就少搬点,并且限定在 [2, 512] 之间。 - 算出总共需要多少字节:
比如要搬num
个对象,每个对象size
字节,总共num * size
字节。 - 换算成页数:
把总字节数除以页大小(PAGE_SHIFT
是页大小的对数,通常是 8KB),最终得到npage
。如果结果是 0(比如很小的对象),也会被强制设置成至少 1 页,保证不会返回空Span
。
下面我们举两个例子:
- 对象大小为 8 字节:
NumMoveSize(8) = 256
,因为1024 / 8 = 128
,在 [2, 512] 范围内;- 总字节数:256 × 8 = 2048 字节;
- 页数:2048 / 8192 = 0.25 → 向下取整为 0,但最终返回 1 页。
- 对象大小为 128KB:
NumMoveSize(131072) = 2
,因为1024 / 131072 < 2
,被强制取 2;- 总字节数:2 × 131072 = 256KB;
- 页数:256KB / 8KB = 32 → 返回 32 页。
由此可以看出:Span
并不是固定管理某个桶号对应的页数,而是动态地根据实际要搬运的小块大小去合理分配页数。这个机制既保证了空间利用率,也避免了内存碎片问题。
05.3 使用 SpanList 管理 Span
在了解了 Span
是如何按页单位管理大块内存之后,我们还需要搞清楚一个问题:这些 Span
是如何在 CentralCache
中被组织起来的?答案是通过 SpanList
。我们可以把它理解成一个管理不同大小 Span
的仓库,负责挂载所有当前空闲的、已经被切好块的 Span
。
SpanList
的实现本质上是一个双向循环链表,每一个桶号都有对应的一条链表来维护这些 Span
,例如 24B 对象的第 2 号桶,就会拥有一条专门维护 24B 对象的 SpanList
,里面挂着一个个页数不同、但都被切成 24B 小块的 Span
。这使得 CentralCache
的内部结构非常清晰:每种对象大小一个桶,每个桶一个链表,链表里挂的是装满了该类型对象的 Span
。
此外,为了解决前面我们提到的线程安全问题,SpanList
在内部维护了一把互斥锁 std::mutex _mtx
。我们已经知道,它是桶级别的锁,每个桶只保护自己的 SpanList
,而不影响其他桶。
了解了 SpanList
的这些特性,下面我们来看下它的实现:
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span *Begin();Span *End();bool Empty();void PushFront(Span *span);Span *PopFront();void Insert(Span *pos, Span *newSpan);void Erase(Span *pos);public:std::mutex _mtx; // 桶锁
private:Span *_head;
};
05.4 FetchFromCentralCache 的实现
接下来我们来梳理一下 ThreadCache
和 CentralCache
之间的交互逻辑,也就是 FetchFromCentralCache
函数的实现。
首先我们要明确这个函数的作用是:当线程从 ThreadCache
申请内存时,如果对应的自由链表里已经没有空闲块了,就需要向 CentralCache
请求更多的内存。CentralCache
会从对应哈希桶下挂的 Span
中取出若干块内存,交还给 ThreadCache
使用。
那这里有一个关键问题:每次要从 CentralCache
拿多少块比较合适?拿太少效率低,拿太多又容易浪费。
为了解决这个问题,我们引入了一种“慢启动 + 增量反馈”的策略,这和网络里的拥塞控制有点类似。刚开始的时候,CentralCache
只给 ThreadCache
一小块内存,如果发现 ThreadCache
对这个大小的内存频繁请求,就逐步增加它每次拿的数量。比如第一次只给 1 块,第二次给 2 块,第三次给 3 块…… 以此类推。这个过程有上限,不能无限增加,以防内存浪费。
这个动态控制的核心在于:ThreadCache
里的每个 FreeList
都记录了一个 MaxSize
值,表示当前最多能申请多少块。 CentralCache
每次分配时,会取 MaxSize
和 SizeClass::NumMoveSize(alignSize)
中较小的值来作为本次的分配数量。如果这次的分配量正好等于 MaxSize
,那说明当前请求频率比较高,就将 MaxSize
加 1,为下一次留出更大的配额。这样就实现了逐步放宽的反馈机制。之后,CentralCache
会调用另一个函数 FetchRangeObj
,从对应的 Span
中取出一整段连续的内存块。这个函数返回实际取出的块数(可能比请求的少),并通过 start
和 end
两个输出指针返回这段内存的起止位置。
拿到这段空间后,如果只获得了 1 块,就直接返回给线程;如果获得了多块,就把前一块返回给线程,把剩下的 [ObjNext(start), end]
插入到当前线程的自由链表中,供后续分配使用。
整个流程清晰又高效,既能动态适应线程的内存请求,又尽量避免资源浪费,是一个很优雅的设计。
下面是 FetchFromCentralCache
函数的具体实现:
void *ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void *start = nullptr;void *end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum > 0);if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);return start;}
}
到目前为止,我们实际上只实现了空间的“申请”逻辑,释放还没涉及,比如 ThreadCache
如何将不用的块还给 CentralCache
,以及 CentralCache
如何将空闲 Span
归还给 PageCache
。为了让流程更容易理解,我们打算先把分配的部分理顺,后续再补上回收机制。
06 Page Cache 实现
PageCache
在整体架构上和 CentralCache
类似,核心都是通过哈希桶来管理 Span
。区别在于,在 CentralCache
中,哈希映射规则和 ThreadCache
一样,都是通过 size
来定位桶,而 PageCache
则是按照 Span
的页数来组织的:第 i
号桶中挂的全是管理 i
页的 Span
。
此外,由于 PageCache
的 Span
是按页数分桶的,所以其哈希结构更直接,不涉及对内部空间的再划分。也就是说,从 PageCache
拿到的 Span
是一整块的连续内存,具体的划分操作交由 CentralCache
去处理。
PageCache
中 Span
的最大管理页数是 128 页。如果一页是 8KB,那么 128 页就是 1MB 的空间,已经能满足最大 256KB 块的需求。当然也可以扩展更大的 span 管理能力,但当前实现以 128 页为界限。
简单介绍完 PageCache
和 CentralCache
的差异后,下面我们来看看 PageCache
的结构。
06.1 Page Cache 的基本结构
和 CentralCache
一样,PageCache
的核心是一个数组,数组的每个元素都是一个 SpanList
,总数为 129 个(多预留一个,方便从 1 号桶开始直接映射)。这个数组中的第 n 项就负责 n 页大小的 Span
。
另一点和 CentralCache
一样的是,PageCache
也需要加锁。但 CentralCache
是按桶加锁,而 PageCache
是整体加锁。这是因为 Span
的分裂和合并可能会影响多个桶之间的状态,只有整体加锁才能保证一致性。由于多个线程可能同时通过 CentralCache
向 PageCache
申请 Span
,所以这个全局锁是必须的。
PageCache
作为全局唯一组件,也被实现成了饿汉式单例,静态实例定义放在 .cpp
文件中,避免链接冲突。
class PageCache
{public:static PageCache* GetInstance(){return &_sInst;}// 获取从对象到 Span 的映射Span* MapObjectToSpan(void* obj);// 获取一个 k 页的 SpanSpan* NewSpan(size_t k);// 将空闲 Span归还至 page cache,并合并相邻 Spanvoid ReleaseSpanToPageCache(Span* span);public:std::mutex _pageMtx;private:SpanList _spanLists[NPAGES]; // NPAGES = 129,多开的那一个桶(第 0 号)是为了映射逻辑更自然,省去了 -1 的偏移操作。ObjectPool<Span> _spanPool;TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;static PageCache _sInst;PageCache(){}PageCache(const PageCache&) = delete;
};
06.2 Span 的分裂与合并机制
无论是在 ThreadCache
、CentralCache
还是 PageCache
中,内存的申请与释放流程始终是核心。PageCache
中的分裂与合并机制,正是为了灵活调配这些 Span
。
分裂逻辑:
当 CentralCache
通过 NewSpan()
向 PageCache
请求一个管理 k 页的 Span
时,PageCache
会先看对应桶(第 k 号)是否有 Span
。如果没有,就从更大页数的桶中找,看是否能找到一个可以切分的 Span
。比如想要一个 4 页的 Span
,如果只有 10 页的 Span
,那么就切成 4 页和 6 页的两个 Span
,将后者重新挂回合适的桶中。相关代码如下所示:
// 大于128页的直接向堆申请
if (k > NPAGES - 1)
{void* ptr = SystemAlloc(k);//Span* span = new Span;Span* span = _spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;//_idSpanMap[span->_pageId] = span;_idSpanMap.set(span->_pageId, span);return span;
}for (size_t i = k + 1; i < NPAGES; i++)
{if (!_spanLists[i].Empty()){// 切分成一个K页的 Span 和一个 n-k 页的 Span// K页的返回给 central cache,n-k 页的挂到第 n-k 个桶中去Span* nSpan = _spanLists[i].PopFront();//Span* kSpan = new Span;Span* kSpan = _spanPool.New();// 在 nSpan 的头部切K页kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;// 挂到第 n-k 个桶中去_spanLists[nSpan->_n].PushFront(nSpan);// 存储 nSpan 的首尾页号和 nSpan 映射,便于 page cache 回收内存时进行合并查找//_idSpanMap[nSpan->_pageId] = nSpan;//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;_idSpanMap.set(nSpan->_pageId, nSpan);_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);// 建立 id 和 Span 的映射,方便 central cache 回收小块内存时查找对应的 Spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i)_idSpanMap.set(kSpan->_pageId + i, kSpan);//_idSpanMap[kSpan->_pageId + i] = kSpan;return kSpan;}
如果从第 k 号桶一直查到 128 号桶都没有合适的 span,就只能通过系统调用(如 mmap
或 VirtualAlloc
)申请一个新的 128 页 Span
,然后再按需切分。相关代码如下所示:
// 走到这里说明后面的桶都没有 Span
// 找堆要一个128页的 Span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
合并逻辑:
当 CentralCache
通过 ReleaseSpanToPageCache()
释放回一个 Span
后,PageCache
会尝试和相邻的空闲 Span
合并,减少碎片。比如一个页号为 100,管理 10 页的 Span
,其范围是 [100, 110)。如果发现第 99 页和第 110 页对应的 Span
都是空闲的,就把它们合并为一个从 99 开始、总共 12 页的 Span
。这个过程会向左右两边尽可能扩展,直到遇到不连续的页为止。相关代码如下所示:
// 对 Span 前后的页尝试进行合并,缓解内存碎片问题
while (1)
{PAGE_ID prevId = span->_pageId - 1;//auto ret = _idSpanMap.find(prevId);//前面的页没有,不进行合并//if (ret == _idSpanMap.end())// break;auto ret = (Span*)_idSpanMap.get(prevId);if (ret == nullptr)break;// 前面的页正在被使用,不进行合并Span* prevSpan = ret;if (prevSpan->_isUse == true)break;// 合并出超过128页的 Span,无法管理,不进行合并if (prevSpan->_n + span->_n > NPAGES - 1)break;span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);//delete prevSpan;_spanPool.Delete(prevSpan);
}// 向后合并
while (1)
{PAGE_ID nextId = span->_pageId + span->_n;//auto ret = _idSpanMap.find(nextId);后面的页没有,不进行合并//if (ret == _idSpanMap.end())// break;auto ret = (Span*)_idSpanMap.get(nextId);if (ret == nullptr)break;// 后面的页正在被使用,不进行合并Span* nextSpan = ret;if (nextSpan->_isUse == true)break;// 合并出超过128页的 Span,无法管理,不进行合并if (nextSpan->_n + span->_n > NPAGES - 1)break;span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);//delete nextSpan;_spanPool.Delete(nextSpan);
}
06.3 GetOneSpan 的获取逻辑
前面我们知道,当 CentralCache
中没有可用的 Span
时,会调用 GetOneSpan
向 PageCache
请求。这个函数首先检查 CentralCache
对应的桶中是否有可用的 Span
,如果没有就调用 PageCache
的 NewSpan(size_t k)
从 PageCache
取一个 k 页的 Span
。
这里的 k 需要根据实际请求的块大小 size
来计算,为此专门设计了一个“块大小到页数”的映射算法。例如申请 256KB 的块时,需要管理 64 页的 Span
。具体计算过程我们已经在前面讲 SizeClass::NumMovePage(size)
的时候已经介绍过了,所以不再赘述。
CentralCache
获取到 Span
后,会将它划分为若干个大小为 size
的块,然后通过指针串联成链表,挂在 Span
的 _freeList
上。这些块空间都是连续的,划分过程只需从起始地址出发,依次向后按 size
步进即可。
划分完成后,将该 Span
插入 CentralCache
对应的桶中,然后返回给 CentralCache
使用。
相关代码如下:
// 获取⼀个非空的 span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 查看当前的 spanlist 是否还有未分配对象的 SpanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}elseit = it->_next;}// 走到这里说明没有空闲 Span 了,找 page cache 要// 先把 central cache 的桶锁解开,防止其他线程释放对象时阻塞list._mtx.unlock(); PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));span->_isUse = true;span->_objSize = size;PageCache::GetInstance()->_pageMtx.unlock();// 计算 Span 大块内存的起始地址和大小// 以下操作不需要加锁,因为目前其他线程访问不到该Spanchar* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT;char* end = start + bytes;// 把大块内存切成自由链表链接起来span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1;while (start < end){++i;NextObj(tail) = start;tail = NextObj(tail);start += size;}NextObj(tail) = nullptr;// 将切好的 Span 挂到桶里面时需要加锁list._mtx.lock();list.PushFront(span);return span;
}
07 回收空间
在内存分配的过程中,不仅要高效地申请空间,合理地回收也同样关键。我们先来看看 ThreadCache
是如何回收空间的。
07.1 Thread Cache 的归还逻辑
当线程释放内存时,它会先将这些块对齐后挂回到对应大小的自由链表中。但如果某个链表里的块数量过多,就需要将部分块归还给 CentralCache
。具体的判断标准是:某个桶中的块数如果超过当前的最大申请批量(即 MaxSize
),就会把 MaxSize
个块还给 CentralCache
。相关代码如下所示:
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(size <= MAX_BYTES);assert(ptr);size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);if (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
值得一提的是,这个 MaxSize
是动态变化的,从 1 开始递增,呈等差数列增长。随着线程不断请求同样大小的块,这个数列的总申请量自然会超过某次的 MaxSize
。因此,当这些块被释放回来时,很可能一次就超过当前允许的最大数量,触发归还机制。
实际的 tcmalloc
源码比这里展示的逻辑要复杂得多,除了块数,还会考虑如线程总管理空间不能超过 2MB 等其他因素。我们这里只是用一个简化版规则来理解这个过程。
07.2 Central Cache 的归还逻辑
当 CentralCache
收到 ThreadCache
归还的一段空间时,它并不能简单地认为这些块来自同一个 Span
。因为这些空间可能是多个 Span
的组合,归还时间也不同。
好在每块内存都有地址,我们可以通过 地址 >> 13
(即除以 8KB)来获取页号,然后通过预先建立的哈希表(页号 -> Span
映射)快速定位块所属的 Span
。这比逐个对比所有 Span
要高效得多,时间复杂度从 O(m*n) 降到 O(1)。
这张哈希表在分配 Span
给 CentralCache
时就会更新,并保存在 PageCache
中。这样后续 CentralCache
回收空间时,只需地址右移再查表,即可知道该块属于哪个 span。
CentralCache
在处理这些块时,会逐一将它们插入对应 Span
的 _freeList
,并更新其 _useCount
。一旦 _useCount
归零,说明该 Span
的所有块都已归还,这时可以考虑将它交还给 PageCache
,以便将其合并为更大的 Span
。
在归还前,需要先把该 Span
从 CentralCache
的桶中移除,并将其 _freeList
设为空。因为经过多次归还,这些块的顺序已经被打乱,已无法再作为一个有序链表使用,但这不影响 PageCache
之后的统一管理。
由于 Span
的合并涉及修改共享结构,为避免并发冲突,这一过程要加锁。同样,在归还 Span
之前也需要释放当前桶的锁,以免阻塞其他线程的操作。
相关代码如下:
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();while (start){void* next = NextObj(start);Span* span = PageCache::GetInstance()->MapObjectToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;if (span->_useCount == 0){// 说明 Span 切分出去的所有小块内存都已归还// 此时该 Span 可以再向 page cache 归还并尝试相邻页的合并_spanLists[index].Erase(span);span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;// 释放 Span 给 page cache 时,使用 page cache 的锁即可// 暂时把桶锁解掉_spanLists[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanLists[index]._mtx.lock();}start = next;}_spanLists[index]._mtx.unlock();
}
07.3 Page Cache 的归还逻辑
当 PageCache
接收到 CentralCache
归还的 Span
,它不会立刻挂到固定大小的桶中。因为有可能存在前后页相邻的其他空闲 Span
,这时我们就可以将它们合并,形成更大的内存块,提高空间利用率。合并逻辑和前面的分裂类似,也需要加锁来保证安全。
具体的合并过程我们已经在Span 的分裂与合并机制小节那里讲过了,所以这里就不再赘述,相关代码如下:
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 大于128页的直接还给堆if (span->_n > NPAGES - 1){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);//delete span;_spanPool.Delete(span);return;}// 对 Span 前后的页尝试进行合并,缓解内存碎片问题while (1){PAGE_ID prevId = span->_pageId - 1;//auto ret = _idSpanMap.find(prevId);//前面的页没有,不进行合并//if (ret == _idSpanMap.end())// break;auto ret = (Span*)_idSpanMap.get(prevId);if (ret == nullptr)break;// 前面的页正在被使用,不进行合并Span* prevSpan = ret;if (prevSpan->_isUse == true)break;// 合并出超过128页的 Span,无法管理,不进行合并if (prevSpan->_n + span->_n > NPAGES - 1)break;span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);//delete prevSpan;_spanPool.Delete(prevSpan);}// 向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;//auto ret = _idSpanMap.find(nextId);后面的页没有,不进行合并//if (ret == _idSpanMap.end())// break;auto ret = (Span*)_idSpanMap.get(nextId);if (ret == nullptr)break;// 后面的页正在被使用,不进行合并Span* nextSpan = ret;if (nextSpan->_isUse == true)break;// 合并出超过128页的 Span,无法管理,不进行合并if (nextSpan->_n + span->_n > NPAGES - 1)break;span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);//delete nextSpan;_spanPool.Delete(nextSpan);}_spanLists[span->_n].PushFront(span);span->_isUse = false;//_idSpanMap[span->_pageId] = span;//_idSpanMap[span->_pageId + span->_n - 1] = span;_idSpanMap.set(span->_pageId, span);_idSpanMap.set(span->_pageId + span->_n - 1, span);}
08 细节处理
到这里,整个内存分配与释放的主流程已经基本完成了,接下来要处理的是一些细节问题。虽然这些部分不像主流程那样硬核,但要想系统真正稳定高效运行,把这些小问题理清楚同样重要。
08.1 单次申请超过256KB时的处理
首先是单次申请超过 256KB 的情况。我们在之前的逻辑中默认 ThreadCache
不会处理超过 256KB 的请求,但在实际使用中这种需求是存在的。我们知道 PageCache
中一个 Span
最多可以管理 128 页的空间,即 1MB(128 × 8KB)。因此,如果申请的空间在 256KB 到 1MB 之间,就不再从 ThreadCache
请求,直接向 PageCache
申请即可。
另外,大于 256KB 的空间在申请时需要按页对齐。举个例子:申请 257KB,虽然只超出一点,但由于已经超过 256KB,因此要按页对齐,257KB 对应 32.125 页,就得向上取整,按 33 页来分配,总共是 264KB。这种对齐可能带来几 KB 的碎片,但因为使用周期短、很快就会释放,所以影响不大。
相关代码如下所示:
static inline size_t RoundUp(size_t size)
{if (size <= 128)return _RoundUp(size, 8);else if (size <= 1024)return _RoundUp(size, 16);else if (size <= 8 * 1024)return _RoundUp(size, 128);else if (size <= 64 * 1024)return _RoundUp(size, 1024);else if (size <= 256 * 1024)return _RoundUp(size, 8 * 1024);elsereturn _RoundUp(size, 1 << PAGE_SHIFT);// 单次申请空间大于256KB,直接按照页来对齐
}
对应地,释放过程也遵循类似原则:不再经过 ThreadCache
,而是直接交由 PageCache
处理。如果申请的空间超过了 128 页,那就是直接向操作系统释放。
为了支持这些逻辑,代码中在 ConcurrentAlloc
和 NewSpan
中加入了对大页数的处理,同时在 ReleaseSpanToPageCache
中新增了向操作系统释放空间的逻辑。你可以通过一些测试用例验证,比如申请 257KB(即 33 页)和 129 页的空间,前者会走已有逻辑,后者会直接触发新的逻辑,检查是否按预期走通即可。
相关代码如下所示:
// 大于128页的直接向堆申请
if (k > NPAGES - 1)
{void* ptr = SystemAlloc(k);//Span* span = new Span;Span* span = _spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;//_idSpanMap[span->_pageId] = span;_idSpanMap.set(span->_pageId, span);return span;
}// 直接去堆上按页申请空间
inline static void *SystemAlloc(size_t kpage)
{
#ifdef _WIN32void *ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE);
#else// Linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}// ReleaseSpanToPageCache 部分代码
// 大于128页的直接还给堆
if (span->_n > NPAGES - 1)
{void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);//delete span;_spanPool.Delete(span);return;
}// SystemFree 实现
inline static void SystemFree(void *ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else// sbrk unmmap等
#endif
}
08.2 使用定长内存池替代 new 和 malloc
项目完成后,为了更贴近实际使用,我们进一步优化了一点:让它尽量不依赖 malloc
。虽然这个简易内存池还不够强大,不能完全取代 malloc
,但如果要求没那么高,它已经是个不错的替代方案。要做到不使用 malloc
,就得把项目中用到 new
的地方改成使用我们自定义的定长内存池,比如线程级的 TLSThreadCache
和 PageCache
中大量使用的 Span
。
T* New()
{T* obj = nullptr;// 优先使用还回来的内存if (_freeList){void* next = *((void**)_freeList);obj = (T*)_freeList;_freeList = next;}else{// 如果剩余内存不够一个对象大小时,则重新开大块空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//_memory = (char*)malloc(_remainBytes);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr)throw std::bad_alloc();}obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}// 定位new,显示调用T的构造函数初始化new(obj)T;return obj;
}
08.3 多线程下的内存池安全问题
这时候要注意一个线程安全问题:我们的定长内存池是静态的,在多线程环境下,多个线程共享这个池子,如果不加锁就可能在并发申请空间时出现空指针引用,造成程序崩溃。尤其是在第一次分配内存的时候,如果两个线程同时触发,那就很可能出现竞态。因此需要在分配过程中加一把互斥锁,保证线程安全。不过,为了性能考虑,我们只在创建 TLSThreadCache
的时候加锁,因为这个过程每个线程只会执行一次,而 Span
的创建在已有的 PageCache
锁保护下,是线程安全的。
// 通过 TLS,每个进程可无锁获取自己的ThreadCache对象
if (pTLSThreadCache == nullptr)
{static ObjectPool<ThreadCache> tcPool;//pTLSThreadCache = new ThreadCache;// 防止 Thread Cache 申请到空指针tcPool._poolMtx.lock();pTLSThreadCache = tcPool.New();tcPool._poolMtx.unlock();}
08.4 MapObjToSpan 的加锁处理
涉及查表的过程也得注意线程安全。因为我们用的是 unordered_map
来映射页号和 Span
,而这个容器不是线程安全的。如果在增删元素时刚好有其他线程在查,就可能读取到无效数据,甚至崩溃。因此在 MapObjectToSpan
中也需要加锁,这把锁直接复用 PageCache
的全局锁就可以了。
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;std::unique_lock<std::mutex> lock(_pageMtx); // 出了作用域自动解锁auto ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret;}
09 使用基数树进行优化
09.1 初步结果对比
到这里,核心功能已经完成了,是时候看看我们写的 ConcurrentAlloc
和系统自带的 malloc
在性能上谁更胜一筹。我们专门写了一个 BenchMark.cpp
来进行对比测试:
#include "ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread>vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]{std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread>vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
这个测试会启动多个线程,让每个线程进行多轮次的内存申请和释放操作。测试包括两种场景:一种是每次申请固定大小的内存块,另一种是每次申请不同大小的内存块,模拟真实环境下更复杂的内存使用模式。
我们分别用 malloc/free
和 ConcurrentAlloc/ConcurrentFree
执行相同任务,并记录它们的耗时,最后将结果打印出来。
运行结果:
可以看到,虽然性能相比 malloc
有所提升,但还有没有进一步优化的空间呢?下面我们用性能分析工具来观察一下。
09.2 使用性能探查器分析性能瓶颈
不同平台都有自己的性能分析工具,我这里用的是 Visual Studio
自带的性能探查器。
打开性能分析工具后,勾选需要检测的项目,点击开始,它会自动收集函数调用频率和耗时信息。分析完成后,我们能看到一些关键指标,比如哪个函数最耗时。
在“热路径”里,可以看到函数的调用链,比如 std::thread::invoke
调用了 std::invoke
,后者又进一步调用 lambda
转换成的仿函数。这就像栈帧一样,直观地告诉你时间都花在哪儿了。
还有一个“单个工作最耗时函数”的视图,显示了某个函数总共执行了多久。比如我们这里就发现,最耗时的是 lock
,看来加锁的开销相当大。
能不能避免频繁加锁带来的开销?当然可以。比如 tcmalloc
就用了一种叫“基数树”的结构来处理这类问题,有效规避了频繁加锁对性能的影响。
09.3 引入基数树
为了优化前面提到的性能瓶颈,tcmalloc
使用了基数树结构来代替传统的 unordered_map
。原因在于,当数据量增大时,哈希查找和锁竞争的成本急剧上升,查得越慢,锁竞争越激烈,最终导致整体性能下滑。
**基数树(Radix Tree)**在 tcmalloc
中被设计为一到三层的结构,按需选择。其中一层的基数树最简单,本质上就是一个数组,通过页号直接作为下标定位到对应的 Span*
指针。这种结构在页号不多、位数不高的情况下,可以直接开出一个 2M 大小的数组,不仅查找速度快,而且不需要加锁。
class TCMalloc_PageMap1
{private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;explicit TCMalloc_PageMap1(){//*array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const{if ((k >> BITS) > 0){return nullptr;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v){array_[k] = v;}
};
如果页号位数更多,比如页号占了 19 位,那么这个数组长度就是 219,对应约2MB空间,这在 32 位平台上是完全可接受的。但到了64 位系统,页号的位数可能达到 51 位,直接映射将消耗 254 字节(约 16PB)内存,显然是不现实的,因此就需要分层。
两层基数树的结构类似于二级页表:用页号的前几位(比如5位)索引第一层数组,每个元素指向一个新的数组,再由页号的后几位(比如14位)索引第二层数组中对应的 Span*
。这样不仅实现了快速定位,也允许延迟分配内存,仅在实际映射某段页号时才创建对应的子数组,节省了空间。
三层结构则更进一步,适配 64 位系统的巨大地址空间。其原理和前面一样,只是多了一层索引,让整棵树可以稀疏展开,不需要在初始化时一次性开辟天文数字级别的内存,而是按需动态申请。
使用基数树的另一个优势是它的结构是静态的,即不会频繁调整或改变。读操作仅是通过数组下标获取指针,写操作只是设置某个数组元素指向新的 Span*
。这意味着,在大多数情况下,读取过程中不会发生结构变动,也就不会有线程安全问题。因此,我们可以在读取时放心地去掉锁,从而进一步提升性能。
实际优化中,我们将原本 PageCache
中使用的 unordered_map
替换为单层基数树,在所有需要设置 Span*
的地方使用 set()
,读取的地方使用 get()
,并根据返回值是否为 nullptr
来判断是否成功获取。原先用于处理迭代器和比较的逻辑,也相应地简化了。
//_idSpanMap[span->_pageId] = span;
//_idSpanMap[span->_pageId + span->_n - 1] = span;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
最后,由于整个基数树的写入和读取逻辑在流程中是严格分离的,我们可以保证不会出现两个线程同时对同一个页号进行读写操作的情况,这就为彻底移除锁提供了基础条件,使得整体性能在数据量增长时也能保持良好表现。
09.4 优化后的性能对比
使用基数树进行优化后,下面我们再用性能探查工具观察一下优化后的结果:
可以看到,此时最耗时的操作不再是 lock
,而 malloc
和 free
作为我们被优化的对象理应排在了前列。
我们再看看 BenchMark.cpp
的运行结果:
可以看出,这次 ConcurrentAlloc/ConcurrentFree
的运行速度相比 malloc/free
快了不少。
到这里,这个小项目就算告一段落了。最后再强调一遍:本项目的目的并不是完全复刻 tcmalloc
的源码,而是学习大佬的思路,深入理解其核心机制,从而提升自己的能力。
毕竟真正的 tcmalloc
源码多达几万行,我这边最终也就写了不到两千行代码,只是挑选其中的关键逻辑做了实现与讲解。如果你对源码感兴趣,推荐直接去 GitHub 上阅读原版,收获肯定会更大。
相关文章:
【项目】构建高性能多线程内存池:简化版 tcmalloc 实现指南
00 引言 在高并发应用中,频繁的小块内存申请与释放不仅会带来性能瓶颈,还容易导致内存碎片问题。为此,内存池技术应运而生,而 tcmalloc(Thread-Caching Malloc)作为 Google 开源的高性能内存分配器&#x…...
C++23 Lambda 表达式上的属性:P2173R1 深度解析
文章目录 一、背景与动机(一)Lambda 表达式的发展历程(二)属性的重要性(三)P2173R1 提案的动机 二、语法与使用(一)属性的放置位置1. 普通 Lambda 表达式2. 泛型 Lambda 表达式3. 多…...
libaom 码率控制实验:从理论到实践的深度探索
libaom 码率控制模式介绍 在 libaom 中定义了四种码率控制模式,分别是 VBR、CBR、CQ、Q;枚举类型会被用在编码器配置结构体 aom_codec_enc_cfg 中,通过 rc_end_usage 字段来设置编码器的码率控制策略。具体应用范围如下: AOM_VBR…...
golang的slice扩容过程
Go 语言中的切片扩容机制是 Go 运行时的一个关键部分,它确保切片在动态增加元素时能够高效地管理内存。这个机制是在 Go 运行时内部实现的,涉及了内存分配、数据拷贝和容量调整。扩容的实现主要体现在 runtime.growslice 函数中。下面我们将深入分析 Go …...
MCP 集合网站
分享个超全 MCP 网站,以后找资源不用愁,不谢。 MCPServers | Model Context Protocol Implementation | MCPServers.cnMCPServers - Model Context Protocol Servers for AI model serving. The official platform for MCP, MCPServer, and Model Contex…...
C++: Initialization and References to const 初始化和常引用
cpp primer 5e, P97. 理解 这是一段很容易被忽略、 但是又非常重要的内容。 In 2.3.1 (p. 51) we noted that there are two exceptions to the rule that the type of a reference must match the type of the object to which it refers. The first exception is that we …...
ES通过API操作索引库
1. 导入restClient依赖 <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.12.1</version></dependency> 2. 了解ES核心客户端API 核心区别…...
MySQL:存储函数和存储过程
系列文章目录 1.MySQL编程基础 2.程序控制流语句 3.存储过程 4.游标 5.嵌入式SQL 文章目录 系列文章目录前言一、程序控制流语句:二、存储函数: 1.存储函数的特点:2.存储函数的定义:3.调用存储函数 三、存储过程:…...
visual studio安装字体
以下是在 Windows 系统中将 Visual Studio 字体更换为 JetBrains 字体(如 JetBrains Mono)的完整指南,涵盖从下载安装到高级优化的全流程: 一、下载并安装 JetBrains 字体 获取字体文件 访问 JetBrains Mono 官方下载页面&#x…...
网络安全·第四天·扫描工具Nmap的运用
今天我们要介绍网络安全中常用的一种扫描工具Nmap,它被设计用来快速扫描大型网络,主要功能包括主机探测、端口扫描以及版本检测,小编将在下文详细介绍Nmap相应的命令。 Nmap的下载安装地址为:Nmap: the Network Mapper - Free Se…...
SSM考研助手管理系统
🍅点赞收藏关注 → 添加文档最下方联系方式咨询本源代码、数据库🍅 本人在Java毕业设计领域有多年的经验,陆续会更新更多优质的Java实战项目希望你能有所收获,少走一些弯路。🍅关注我不迷路🍅 项目视频 03…...
通道注意力机制|Channel Attention Neural Network
一、通道注意力机制 论文:ECA-Net: Efficient Channel Attention for Deep Convolutional Neural Networks 近年来,通道注意力机制在提高深度卷积神经网络CNN的性能方面显示出了巨大潜力。然而,大多数现有方法致力于开发更复杂的注意力模块…...
844. 比较含退格的字符串
给定 s 和 t 两个字符串,当它们分别被输入到空白的文本编辑器后,如果两者相等,返回 true 。# 代表退格字符。 注意:如果对空文本输入退格字符,文本继续为空。 示例 1: 输入:s "ab#c&quo…...
trl的安装与单GPU多GPU测试
文章目录 0 相关资料1 源码安装2 Qwen2.5-0.5B-Instruct 模型下载3 训练demo4 在多个 GPU/节点上进行训练总结 0 相关资料 https://github.com/huggingface/trl https://blog.csdn.net/weixin_42486623/article/details/134326187 TRL 是一个先进的库,专为训练后基…...
Java项目之基于ssm的学校小卖部收银系统(源码+文档)
项目简介 学校小卖部收银系统实现了以下功能: 学校小卖部收银系统的主要使用者分为: 管理员;管理员使用本系统涉到的功能主要有:主页,个人中心,用户管理,员工管理,商品分类管理&am…...
ES DSL 常用修改语句
字段值替换修改 修改sql update zyzkwjj set dhreplace(dh,"WS","WSS") where dh like %WS% update zyzkwjj set dh replace(dh, WS, DZ),ztm replace(ztm, WS, DZ),zrz replace(zrz, WS, DZ) where dh like %WS% or ztm like %WS% or zrz like %WS%…...
数据结构*集合框架顺序表-ArrayList
集合框架 常见的集合框架 什么是顺序表 顺序表是一种线性表数据结构,它借助一组连续的存储单元来依次存储线性表中的数据元素。一般情况下采用数组存储。 在数组上完成数据的增删查改。 自定义简易版的顺序表 代码展示: public interface IArray…...
(二)Graspnet在mujoco的仿真复现(操作记录)
目录 《复现的项目来源》 一、创建虚拟环境 二、下载manipulator_grasp项目 三、配置环境 1、基于graspnet-baseline项目进行开发 (1)下载 graspnet-baseline项目 (2)修改graspnet-baseline的requirements.txt,…...
黑马商城项目(一)MybatisPlus
一、快速入门 入门案例: 常见注解: 常见配置: 条件构造器(wrapper): 案例: Testvoid testUpdateByWrapper(){List<Long> ids List.of(1L,2L,3L);UpdateWrapper<User> userUpdateWrapper new UpdateWrapp…...
Linux系统编程 day2
系统调用 由操作系统实现并提供给外部应用程序的编程接口(API)。是应用程序同系统之间数据交换的桥梁。 文件IO 函数 open/close函数 头文件 #include<fcntl.h> #include<unistd.h>int open(const char*pathname , int flags) int open(const char*pathname , …...
告别繁琐,拥抱简洁:初识 Pytest 与环境搭建 (Pytest系列之一)
在 Python 自动化测试领域,Pytest以其简洁、灵活和强大的特性,越来越受到广大测试工程师和开发者的青睐。如果你还在为繁琐的测试框架而苦恼,或者希望提升你的 Python 测试效率,那么 Pytest 绝对值得你深入了解和使用。 本文将带…...
浏览器运行Pytorch无法启用显卡
注意 我启用高性能之后,结果蓝屏了。无语Thinkpad T14p!!! 仅供参考。 问题与方案 在浏览器里面用Jupyter编写python程序,进行网络模型训练。 里面导入了PyTorch,但是并没有启用显卡执行: 也…...
天元证券|8家汽车零部件上市公司一季度业绩预喜
近日,中国汽车工业协会发布的数据显示,今年一季度,我国汽车产量为756.1万辆,同比增长14.5%;汽车销量为747万辆,同比增长11.2%。 受益于前三个月汽车产、销量同比双增,上游产业链公司交出了一份可…...
C语言学习之sizeof函数和strlen函数
经过我们之前的学习,已经接触过很多次sizeof函数和strlen函数了,应用他们来求解数组大小等等。但是实际应用中两者的差别还是很大的,接下来我们就来了解一下吧。 sizeof函数 sizeof函数计算的是变量所占内存空间的大小的,单位是字…...
从 “经验养生” 到 “数据养生”:智慧养老如何终结 “伪科学”?
在养老养生领域,长期以来 “经验养生” 占据主导地位,各种未经科学验证的说法和做法流传甚广,其中不乏 “伪科学” 内容,给老年人的健康和生活带来诸多误导。 随着智慧养老的兴起,“数据养生” 模式逐渐崭露头角&…...
什么是跨域问题以及其解决方案
一、什么是跨域? 1.1. 什么是同源策略?1.2. 同源策略限制以下几种行为 二、常见的跨域场景三、9种跨域解决方案 1. JSONP跨域 1.1. 原生JS实现1.2. jquery Ajax实现1.3. Vue axios实现 2. CORS 跨域资源共享 (前端不需要做任何改变࿰…...
Go之Slice和数组:深入理解底层设计与最佳实践
在Go语言中,数组(Array)和切片(Slice)是两种看似相似却本质不同的数据结构。本文将深入剖析它们的底层实现机制,并结合实际代码示例,帮助开发者掌握核心差异和使用场景。 一、基础概念ÿ…...
快速部署大模型 Openwebui + Ollama + deepSeek-R1模型
背景 本文主要快速部署一个带有web可交互界面的大模型的应用,主要用于开发测试节点,其中涉及到的三个组件为 open-webui Ollama deepSeek开放平台 首先 Ollama 是一个开源的本地化大模型部署工具,提供与OpenAI兼容的Api接口,可以快速的运…...
RF connect SDK 修改蓝牙address的方法
目录 概述 1 静态设置蓝牙地址(编译时配置) 1.1 通过 prj.conf 文件设置 1.2 通过 overlay 文件设置 2 动态修改蓝牙地址(运行时修改) 2.1 使用 bt_addr_le_t 结构 2.2 使用 bt_set_id_addr(适用于 Public Add…...
MuJoCo(Multi-Joint Dynamics with Contact)机器人仿真器存在的问题
MuJoCo物理引擎计算接触力的核心思路,是通过数学优化的方式同时满足多个物理约束,而不是简单地为每个碰撞点单独计算作用力。它的工作流程可以理解为几个阶段的紧密配合。首先,仿真器会快速检测所有可能发生接触的物体表面,筛选出…...
IDEA远程Debug调试
最近开发一个功能,因为环境问题,本地无法正常将多个微服务都启动成功。 另外接手了一个新活, 逻辑比较复杂,需要去研究一下测试一下原来的功能逻辑。方便找到新任务功能点的切入点。这才了解到Idea远程debug调试的功能。 本文章…...
爱普生有源晶振SG2016VHN在网络服务器中的应用
在数字化浪潮席卷全球的当下,网络服务器作为数据存储、处理与传输的核心枢纽,其性能的稳定与高效直接关系到整个网络生态的正常运转。从企业内部的数据管理,到互联网服务提供商的大规模数据运算,网络服务器需要应对海量数据的高速…...
【差分隐私相关概念】瑞丽差分隐私(RDP)命题2
分步解析与答案 1. c-稳定变换的定义 c-稳定变换是一种将群体数据集(如数据库集合)的相邻性映射到个体数据集(如单条记录变化)的变换。具体来说,若变换 g : D ′ → D g: \mathcal{D} \to \mathcal{D} g:D′→D 是 …...
大前端基础学习
一、cs架构和bs架构 c:客户端, b:浏览器(无需安装,无需更新,可跨平台)√ s:server服务端,帮我们保 存信息,传递信息 二、 altshift向下键向下复制一行 …...
Spring Batch 专题系列(四):配置与调度 Spring Batch 作业
1. 引言 在上一篇文章中,我们详细探讨了 Spring Batch 的核心组件(Job、Step、Chunk、ItemReader、ItemProcessor、ItemWriter),并通过示例展示了它们的协作方式。掌握了这些组件后,接下来需要了解如何灵活配置 Spring Batch 作业,并通过调度机制控制作业的执行时机。本…...
Android 项目配置文件解释
Android 项目配置文件解释 目录 Android 项目配置文件解释1. `plugins` 块2. `android` 块3. `dependencies` 块为什么需要 JDK 和 Kotlin1. plugins 块 plugins {id com.android.applicationid org.jetbrains.kotlin.android }id com.android.application:应用 Android 应用…...
uniapp 自定义tabbar
v3 写法 <template><view class"" v-if"Data.imgurl"><view class"tabbars" ref"tabbars" id"tabbars"><view class"flex jsa"><view class"tabbarc_li" click"go(/p…...
C++高级2 智能指针
智能指针介绍 裸指针 int * p new int(10); *p 30; delete p; 必须要手动释放 智能指针 保证能做到资源的自动释放,利用栈上的对象出作用域自动析构的特征,来做到资源的自动释放的 实现一个简单的智能指针 class Cptr { public:Cptr(T* ptr …...
【FPGA】——DDS信号发生器设计
目录 一 、IP核的使用 (1)RAM IP核 (2)FIFO IP核 二、DDS信号发生器设计 (1)代码 (2)仿真波形 一 、IP核的使用 IP核:ASIC或FPGA中预先设计好具有某种功能的电路模块,参数可修改…...
mysql按条件三表并联查询
下面为你呈现一个 MySQL 按条件三表并联查询的示例。假定有三个表:students、courses 和 enrollments,它们的结构和关联如下: students 表:包含学生的基本信息,有 student_id 和 student_name 等字段。courses 表&…...
centos 7 docker创建的postgres数据库状态检查
一 、打开finalshell 连接主机 二、检查docker状态,以下信息表示数据库准备好连接了 注意:当finalshell可以访问主机,但没有准备信息时,重启centos系统试试 docker logs postgres 三、如果是传统安装 基于 systemd 的系统(如 CentOS 7 及以上、Ubuntu 16.04 及以…...
基于EasyX库开发的球球大作战游戏
目录 球球大作战 一、开发环境 二、流程图预览 三、代码逻辑 1、初始化时间 2、设置开始界面大小 3、设置开始界面 4、让玩家选择速度 5、设置玩家小球、人机小球、食物的属性 6、一次性把图绘制到界面里 7、进入死循环 8、移动玩家小球 9、移动人机 10、食物刷新…...
ES和MySQL概念对比
基本概念 ES和MySQL都属于数据库,不过各有各的特性,大致使用方法与MySQL类似并无区别。 MySQL:擅长事务持有ACID的特性,确保数据的一致性和安全。 ES:持有倒排索引,适合海量数据搜索和分析。 ES和MySQL如何…...
Kotlin 与 Jetpack Compose 参数设计完全指南
作为 Kotlin 和 Jetpack Compose 开发者,合理的参数设计能显著提升代码的可读性和易用性。本文将系统整理各类参数规则,帮助您编写更优雅的 API。 一、基础参数规则 1. 方法参数 // 基础定义 fun 方法名(必需参数: 类型, 可选参数: 类型 默认值): 返…...
dea如何使用git
在 IntelliJ IDEA 中使用 Git 的详细步骤如下,分为配置、基础操作和高级功能,适合新手快速上手: 一、配置 Git 安装 Git 下载并安装 Git,安装时勾选“Add to PATH”。验证安装:终端输入 git --version 显示版本…...
git -- 版本控制介绍(分布式系统),git介绍(对待数据的方式,本地执行,保证完整性,只添加数据,git文件/项目的三种状态,基本的工作流程)
目录 版本控制 介绍 分布式版本控制系统 git 介绍 与多个远程仓库协作 对待数据的方式 本地执行操作 保证完整性 只添加数据 三种状态 工作区 暂存区 Git 目录 基本的git工作流程 版本控制 介绍 一种记录一个或多个文件内容变化的系统,它可以让你在未来…...
嵌入式软件OTA升级,有哪几种Flash划分方式?
第一次接触嵌入式软件OTA升级的时候,我整个人也是懵的。Flash划分?什么鬼?不是直接把新程序烧进去就完事了吗? 结果一上手才发现,这玩意没那么简单,尤其是Flash怎么分,如果Flash划分没弄好&…...
游戏引擎学习第226天
引言,计划 我们目前的目标是开始构建“元游戏”结构。所谓元游戏,指的是不直接属于核心玩法本身,但又是游戏体验不可或缺的一部分,比如主菜单、标题画面、存档选择、选项设置、过场动画等。我们正在慢慢将这些系统结构搭建起来。…...
通过python实现bilibili缓存视频转为mp4格式
需要提前下好ffmpeg import os import fnmatch import subprocess Bilibili缓存的视频,*280.m4s结尾的是音频文件,*050.m4s结尾的是视频,删除16进制下前9个0,即为正常音/视频 使用os.walk模块,遍历每一个目录…...
Windows 图形显示驱动开发-WDDM 1.2功能—无显示器系统支持
一、架构设计与启动流程 1.1无显示器系统启动与全流程 graph TDA[UEFI固件] -->|FADT.VGA_NOT_PRESENT1| B[Windows Boot Manager]B -->|加载winload.efi| C[内核初始化]C -->|检测显示设备| D{存在GPU?}D -->|是| E[加载WDDM驱动]D -->|否| F[激活MSBDD虚拟…...