前言: (Memory Pool)是一种内存分配方式,又被称为固定大小区块规划(fixed-size-blocks allocation)。通常我们习惯直接使用new、malloc等API申请分配内存,这样做的缺点在于:由于所申请内存块的大小不定,当频繁使用时会造成大量的内存碎片并进而降低性能。
秋招可以写进简历的6个实战项目:
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
这个项目的要求的知识储备?
这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。
一、基础概念 1.1什么是内存池 所谓池化技术,就是程序先向系统申请过量的资源,然后自己管理,当程序中需要申请内存时,不是直接向操作系统申请,而是直接从内存池中获取,释放内存时也不是将内存返回给操作系统,而是返回内存池中。
因为每次申请该资源都有较大的开销,这样提前申请好了,使用时就会非常快捷,能够大大提高程序运行效率。在计算机中有很多使用这种池技术的地方,例如线程池、连接池等。
动态内存申请malloc
C++中动态申请内存都是通过malloc去申请的,但实际上我们并不是直接去堆中获取内存的,而malloc就是一个内存池。
malloc() 相当于向系统 “批发” 了一块较大的内存空间,然后“零售” 给程序使用,当全部使用完或者程序有大量内存需求时,再根据需求向操作系统申请内存。
1.2内存池 内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的问题
内存池解决
那么什么是内存碎片呢?
外部碎片: 一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求
内部碎片: 由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
1.3malloc C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的。而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。
原理图 二、定长内存池设计 2.1概述 作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能 ,下面我们就先来设计一个定长内存池。
2.2设计思路 开辟内存:
申请内存:
释放内存:
代码位置: 高并发内存池项目中的ObjectPool.h 文件。
2.3内容讲解 注意:核心三变量
char* _memory = nullptr; // 指向大块内存的指针 size_t _remainBytes = 0; // 大块内存分割过程中剩余的字节数 void* _freeList = nullptr; // 还回来的内存块用自由链接表的头指针链接起来
剩余内存字节数必须大于我们要申请的类型内存空间,回收内存块,我们只需要借助内存块头四个或八个字节(平台不同,指针字节数不同)储存下个内存块的地址。
链表链接内存块方式:数据结构头插思想
存储方式
*(void**)obj = _freeList;
程序结束申请的内存自动回收(内存池,我们不需要关心;回收问题)
实现思路:先从freeList(回收链表)中查找是否还有内存块
有就返回内存块地址,没有— 接下来从 _memory(这个大的内存块)中取;如果内存不足,再申请一大块内存。
2.4代码实现 .h文件
#pragma once #include <iostream> #include <vector> #include <time.h> using std::cout; using std::endl; #ifdef _WIN32 #include<windows.h> #else #endif // 直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; } template <class T> class myObject { public: T* New() { // 优先把还回来内存块对象,再次重复利用 T* obj = nullptr; if (_freeList) { // 头删 void* next= *(void**)_freeList; obj = (T*)_freeList; _freeList = next; } else { // 剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = 1024 * 128; //_memory = (char*)malloc(_remainBytes); _memory = (char*)SystemAlloc(_remainBytes >> 13); // 按页申请内存 if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //申请内存空间必须大于或等于一个指针的空间 _memory += objSize; _remainBytes -= objSize; } // 定位new,显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { // 显示调用析构函数清理对象 obj->~T(); // 头插 *(void**)obj = _freeList; // 取前四或八个字节 _freeList = obj; } private: char* _memory = nullptr; // 指向大块内存的指针 size_t _remainBytes = 0; // 大块内存分割过程中剩余的字节数 void* _freeList = nullptr; // 还回来的内存块用自由链接表的头指针链接起来 };
【文章福利 】小编推荐自己的Linux内核技术交流群:【865977150 】整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!!!2.5效率(malloc与定长内存池) 测试代码:.cpp文件
struct TreeNode { int _val; TreeNode* _left; TreeNode* _right; TreeNode() :_val(0) , _left(nullptr) , _right(nullptr) {} }; void TestObjectPool() { // 申请释放的轮次 const size_t Rounds = 100; // 每轮申请释放多少次 const size_t N = 1000000; std::vector<TreeNode*> v1; v1.reserve(N); size_t begin1 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v1.push_back(new TreeNode); } for (int i = 0; i < N; ++i) { delete v1[i]; } v1.clear(); } size_t end1 = clock(); std::vector<TreeNode*> v2; v2.reserve(N); myObject<TreeNode> TNPool; size_t begin2 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v2.push_back(TNPool.New()); } for (int i = 0; i < N; ++i) { TNPool.Delete(v2[i]); } v2.clear(); } size_t end2 = clock(); cout << "new cost time:" << end1 - begin1 << endl; cout << "object pool cost time:" << end2 - begin2 << endl; }
测试结果图:
结论:明显内存池效率更快。
三、高并发内存池整体设计框架 现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要解决以下几方面的问题。
1.性能问题。
2.多线程环境下,锁竞争问题。
3.内存碎片问题。
3.1三层设计思路 第一层:thread cache(线程缓存):
每个线程独享一个thread cache,用于小于256k的内存分配情况,线程从这里申请内存不需要加锁(因为其他线程无法访问当前线程的 thread cache,没有竞争关系)
第二层:central cache(中心缓存):
所有线程共享一个central cache,thread cache 是按需从central cache中获取对象,central cache在合适的时候会收回thread cache中的对象,避免一个线程占用太多资源。
central cache 是所有线程共享的,所以存在竞争关系,需要加锁;这里使用的锁是桶锁,并且因为只有thread cache没有内存时才会申请内存,所以这里竞争不会太激烈。
第三层:page cache(页缓存):
页缓存存储的内存是以页为单位进行存储的及分配的。central cache没有内存时,则会从page cache中申请一定数量的page,并切割成定长大小的小内存块。
当一个span的几个跨度页的对象都回收后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
3.2thread cache 设计思路(线程缓存) thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
线程申请内存:
线程申请内存时,会有不同规格的内存申请(4字节、5字节等),根据范围划定不同的自由链表,设计多个自由链表管理不同规格的内存小块。实质就相当于使用多个定长内存池的自由链表,每个内存小块采用向上对齐原则(可能会出现多申请内存的情况,这就是内碎片)
例如:
线程对齐规则:
整体控制在最多10%左右的内碎片浪费,总计设计208个桶,[0,15]个桶,每个桶的对齐数相差8字节(最高128字节对齐),[16,71]个桶,每个桶的对齐数相差16字节(最高1024字节对齐)以此类推。
注意:_freeLists是一个数组,每个元素都是自由链表类型(即存储自由链表的头结点)
线程释放内存
thread cache代码框架
本质:就相当于一个数组,每个数组挂个桶;而每个桶就类似于上面那个定长内存池一样。
相关数据:
static const size_t MAX_BYTES = 256 * 1024; // 一页8kb static const size_t NFREELIST = 208; // 总共桶的个数
(ThreadCache.h文件)
// thread cache本质是由一个哈希映射的对象自由链表构成 class ThreadCache { public: // 申请和释放内存对象 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size); // 从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); private: FreeList _freeLists[NFREELIST]; }; // TLS thread local storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
RoundUp函数:表示内存对齐数(例如:对齐数为8,size是7或者9对齐后实际分配内存大小为8或16)
Index函数:根据size大小计算出_freeLists数组下标位置(也就是桶的位置)
Empty函数:判断_freeLists数组下标位置挂接内存是否为空
Pop函数:_freeLists数组下标位置内存块头删取出来并返回该内存块地址
Push函数:_freeLists数组下标位置内存块头插挂接该内存块
(ThreadCache.cpp文件)
// 申请内存对象, 空间先到_freeLists链表中去取,如果没有去中心缓存获取空间 void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size); size_t index = SizeClass::Index(size); if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize); } } // 释放内存对象,空间挂起到_freeLists链表中去 void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); // 找对映射的自由链表桶,对象插入进入 size_t index = SizeClass::Index(size); _freeLists[index].Push(ptr); }
(common.h文件)
static void*& NextObj(void* obj) // 前四个或八个字节(下一个结点的地址) { return *(void**)obj; } // 管理小对象的自由链表 class FreeList { public: void Push(void* obj) { // 头插 NextObj(obj) = _freeList; _freeList = obj; } void* Pop() { // 头删 void* obj = _freeList; _freeList = NextObj(obj); return obj; } bool Empty() { return _freeList == nullptr; } private: void* _freeList = nullptr; };
自由链表的哈希桶跟对象大小的映射关系
(common.h文件)
// 计算对象大小的对齐映射规则 class SizeClass { public: // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐 freelist[0,16) // [128+1,1024] 16byte对齐 freelist[16,72) // [1024+1,8*1024] 128byte对齐 freelist[72,128) // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) static inline size_t _RoundUp(size_t bytes, size_t alignNum) // 算出桶的大小(即size的对齐数) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); } // 对齐大小计算 static inline size_t RoundUp(size_t size) { // 确定每个区间的对齐数 if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 8 * 1024) { return _RoundUp(size, 128); } else if (size <= 64 * 1024) { return _RoundUp(size, 1024); } else if (size <= 256 * 1024) { return _RoundUp(size, 8 * 1024); } else { assert(false); return -1; } } static inline size_t _Index(size_t bytes, size_t align_shift) { return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; // 桶下标从0开始计数 } // 计算映射的哪一个自由链表桶 static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); // 每个区间有多少个链 static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else { assert(false); } return -1; } };
3.3线程缓存无锁设计(TLS) (ConcurrentAlloc.h文件)
#pragma once #include "Common.h" #include "ThreadCache.h" static void* ConcurrentAlloc(size_t size) { // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } static void ConcurrentFree(void* ptr, size_t size) { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); }
TLS:thread local storage 线程本地存储(linux和Windows下有各自的TLS)
TLS是一种变量的存储方式,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问,这样就保证了线程的独立性。
静态TLS使用方法:
_declspec(thread) DWORD data=0;
声明了一个 _declspec(thread) 类型的变量,会为每一个线程创建一个单独的拷贝。
原理:
//每个线程刚创建时,就会获得这个指针,每个线程的指针是不同的。 static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr; //static 修饰,可以保证变量只在当前文件可见。
简单测试一下:
void Alloc1() { for (size_t i = 0; i < 5; ++i) { void* ptr = ConcurrentAlloc(16); } } void Alloc2() { for (size_t i = 0; i < 5; ++i) { void* ptr = ConcurrentAlloc(17); } } void TLSTest() { std::thread t1(Alloc1); t1.join(); std::thread t2(Alloc2); t2.join(); } int main() { //TestObjectPool(); TLSTest(); return 0; }
3.4central cache设计思路(中心缓存) central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
注意:
内存申请:
当thread cache中没有内存时,就会批量向central cache申请一些内存对象,从对应的span中取出小块内存对象给thread cache,这个过程是需要加锁的(加桶锁)
这里批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法。
如果所有的span都为空了(即central cache使用完了),则将空的span链在一起,向page cache申请一个span对象,span对象中是一些以页为单位的内存,需要切成对应的小块内存对象,并链接起来,挂到span中。
central cache中每一个span都有一个use_count,分配一个对象给thread cache,就加加。
内存释放:
当thread_cache将内存释放回central cache中的时,释放回来就减减use_count。
当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
CentralCache 代码框架:
(CentralCache.h文件)
#pragma once #include "common.h" // 单例模式---饿汉模式 class CentralCache { public: static CentralCache* GetInstance() { return &_sInst; } // 获取一个非空的span Span* GetOneSpan(SpanList& list, size_t byte_size); // list: 表示_spanLists中span的_freeList连续挂接空间的数量 // 从中心缓存获取一定数量的对象给thread cache size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); // size:内存空间大小;batchNum:内存空间个数 // start 和 end 表示内存空间地址起始到终末范围 private: SpanList _spanLists[NFREELIST]; private: CentralCache() {}; CentralCache(const CentralCache& t) = delete; static CentralCache _sInst; // 声明 };
GetOneSpan函数:需要到PageCache模块去讲解(CentralCache.cpp文件)
CentralCache CentralCache::_sInst; //定义 // 获取一个非空的span Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size) { // ... return nullptr; } // 从中心缓存获取一定数量的对象给thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) { // 每个桶加锁 size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); Span* span = GetOneSpan(_spanLists[index], size); assert(span); assert(span->_freeList); // 从span中获取batchNum个对象 // 如果不够batchNum个,有多少拿多少 start = span->_freeList; end = start; size_t i = 0, actualNum = 1; while (i < batchNum - 1 && NextObj(end) != nullptr) // 取连续内存块,当batchNum超过原有时,按拥有的数量给 { end = NextObj(end); ++i; ++actualNum; } span->_freeList = NextObj(end); // 剩余空间仍挂在Span中的_freeList去 NextObj(end) = nullptr; // 最后end记录下一个空间地址置空 span->_useCount += actualNum; _spanLists[index]._mtx.unlock(); // 解锁 return actualNum; }
以页为单位的大内存管理span的定义及spanlist定义(common.h文件)
// Span管理一个跨度的大块内存 // 管理以页为单位的大块内存 struct Span { PAGE_ID _pageId = 0; // 页号 size_t n = 0; // 页数 // 双向链表的结构 Span* next = nullptr; Span* prev = nullptr; size_t _useCount = 0; // 使用计数,切好的小块内存分配给ThreadCache void* _freeList = nullptr; // 大块内存切小链接起来,这样回收回来的内存也方便链接 }; // 带头双向循环链表 class SpanList { public: SpanList() { _head = new Span; _head->next = _head; _head->prev = _head; } Span* Begin() { return _head->next; } Span* End() { return _head; } void Insert(Span* pos, Span* newSpan) { // 头插 Span* prev = pos->prev; prev->next = newSpan; newSpan->prev = prev; newSpan->next = pos; pos->prev = newSpan; } void Erase(Span* pos) { assert(pos != _head); Span* prev = pos->prev; Span* next = pos->next; prev->next = next; next->prev = prev; } private: Span* _head; public: std::mutex _mtx; // 桶锁: 数组_spanLists中每一个位置的桶都有一把锁 };
慢开始反馈调节算法
补全hreadCache.h中FetchFromCentralCache函数只声明未实现
完善common.h文件
1)FreeList中成员变量和函数:
void PushRange(void* start, void* end) { // 一连串空间存储 NextObj(end) = _freeList; _freeList = start; }
MaxSize函数调节页数:
2)类SizeClass的静态成员函数
// 一次从中心缓存获取多少个 static size_t NumMoveSize(size_t size) { assert(size > 0); // [2, 512],一次批量移动多少个对象的(慢启动)上限值 // 小对象一次批量上限高 // 小对象一次批量上限低 int num = MAX_BYTES / size; if (num < 2) num = 2; if (num > 512) num = 512; return num; }
(ThreadCache.cpp文件)
// 慢开始反馈调节算法 // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完 // 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限 // 3、size越大,一次向central cache要的batchNum就越小 // 4、size越小,一次向central cache要的batchNum就越大 void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { // 慢开始反馈调节法 不会开始要太多内存 size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { ++_freeLists[index].MaxSize(); } void* start = nullptr; void* end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size); // 获取连续实际内存的数目 assert(actualNum >= 1); // 至少一个 if (actualNum == 1) { assert(start == end); } else { _freeLists[index].PushRange(NextObj(start), end); } return start; }
3.5page cache设计思路(页缓存) page cache中也是哈希桶结构,但是每个节点存储的都是span,因为页缓存是所有线程共享的,只需要定义一个对象,所以这里将 page cache 设计为单例模式(这里采用饿汉模式的设计方法)。
注意:page cache需要加整体锁(因为是所有线程共享的)
申请内存:
当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。
例如:申请的是4page,4page后面没有挂span,则向后面寻找更大的span,假设在10page位置找到一个span,则将10page span分裂为一个4page span和一个6page span,把4page的span返回给central cache,把6page的span挂载到对应的6号桶中去。
如果找到128 page都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请一个128page 的span挂在自由链表中,再重复1中的过程。
第一次申请内存时,page cache是空的,经过上面的过程后会向堆申请一个128page的span,然后重复上面的过程,将128page的span进行分裂和重新挂载。
释放内存:
如果central cache释放回一个span,则依次寻找span的前后page id的span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
PageCache 代码框架:
page cache是一个以页为单位的span自由链表
为了保证全局只有唯一的page cache,这个类被设计成了单例模式。
注意线程安全,需要锁(此处我们采用整体锁)
相关数据:
static const size_t MAX_BYTES = 256 * 1024; // 一页8kb static const size_t NFREELIST = 208; // central and thread总共桶的个数 static const size_t NPAGES = 129; // page总共桶的个数; 数组下标从1开始总共128页 static const size_t PAGE_SHIFT = 13; // 一页字节数2^13
(PageCache.h文件)
#pragma once #include "common.h" class PageCache { public: static PageCache* GetInstance() { return &_sInst; } // 获取一个k页的span Span* NewSpan(size_t k); std::mutex _pageMtx; private: SpanList _spanLists[NPAGES]; std::unordered_map<PAGE_ID, Span*> _idSpanMap; // 页号于span映射;便于回收内存 PageCache() {}; PageCache(const PageCache& t) = delete; static PageCache _sInst; };
完善一下common.h文件:类SpanList
void PushFront(Span* span) // 头插 { Insert(Begin(), span); } Span* PopFront() // 头删 { Span* front = _head->_next; Erase(front); return front; } bool Empty() { return _head->_next == _head; }
(PageCache.cpp文件)
注意:地址除以一页的字节数就是页号(Page中_pageId)Page中_n 表示几页(_n<<13表示这块内存的大小)
// 获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); // 先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { return _spanLists[k].PopFront(); // 有就直接pop头结点并返回 } // 检查一下后面的桶里面有没有span, 如果有可以把他它进行切分 for (size_t i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].Empty()) { // 进行切分 Span* nSpan = _spanLists[i].PopFront(); // i:位置取出它头结点 Span* kSpan = new Span; // 创k一个结点 kSpan->_pageId = nSpan->_pageId; // 页的起始位置 nSpan->_pageId += k; // 分出k页;n页位置向后走(相当与一大块内存取出头一小块内存) kSpan->_n = k; // 得到k页 nSpan->_n -= k; // 剩余空间; 挂接到nSpan->_n即(i-k)页的位置 _spanLists[nSpan->_n].PushFront(nSpan); // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时 //进行的合并查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页) // 建立id和span的映射; 方便(centralcache)回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到这个位置就说明后面没有大页的span了 //这时就去找堆要一个128页(最大页---NPAGES-1)的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 地址除以13;就是页号 bigSpan->_n = NPAGES - 1; // 内存挂接到相应位置 _spanLists[bigSpan->_n].PushFront(bigSpan); // 函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响 return NewSpan(k); }
windows和Linux下如何直接向堆申请页为单位的大块内存:
VirtualAlloc brk和mmap
我们按一页8k(8*1024)也就是2^13(1<<13) (common.h文件)
#ifdef _WIN32 #include<windows.h> #else // linux #endif // 直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; }
获取一个非空的span
完善一下common.h文件
// 计算一次向系统获取几个页 // 单个对象 8byte // ... // 单个对象 256KB static size_t NumMovePage(size_t size) { size_t num = NumMoveSize(size); // 要几页 size_t npage = num * size; //总字节数 npage >>= PAGE_SHIFT; // 计算多少页,一页8k(2^13);则PAGE_SHIFT为13 if (npage == 0) npage = 1; return npage; }
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size) { // 先从桶的span中找内存 Span* it = list.Begin(); while (it != list.End()) { if (it->_freeList != nullptr) { return it; } it = it->_next; } // 先把central 桶锁解除:这样如果某他线程释放内存对象回来,不会阻塞 list._mtx.unlock(); // 访问page时 需要加锁 PageCache::GetInstance()->_pageMtx.lock(); // 没有空闲的span去PageCache中要 Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size)); // 获取一个k页的span PageCache::GetInstance()->_pageMtx.unlock(); //对获取span进行切分,不需要加锁,因为其他线程拿不到这个span char* start = (char*)(span->_pageId << PAGE_SHIFT); // k页的span的起始地址 size_t bytes = span->_n << PAGE_SHIFT; // 内存字节数 char* end = start + bytes; // 把大块内存切成自由链表链表 // 1. 先切一块作头,然后尾插(这样保证物理空间顺序与逻辑顺序保持一致) span->_freeList = start; start += byte_size; void* tail = span->_freeList; while (start < end) { NextObj(tail) = start; tail = NextObj(tail); start += byte_size; } NextObj(tail)=nullptr; // 挂节的时候恢复cengtral的锁 list._mtx.lock(); list.PushFront(span); return span; }
五、完善整体项目释放流程 5.1thread cache ThreadCache.h文件中的ThreadCache类
增加一个类函数ListTooLong声明
注意:
PopRange:common文件中FreeList类的函数 CentralCache.h文件中的ReleaseListToSpans函数
ListTooLong函数定义实现
void ThreadCache::ListTooLong(FreeList& list, size_t size) { void* start = nullptr; void* end = nullptr; list.PopRange(start, end, list.MaxSize()); //取消挂接。 // 将一定数量的对象释放到span跨度(CentralCache) CentralCache::GetInstance()->ReleaseListToSpans(start, size); } void PopRange(void*& start, void*& end, size_t n) { assert(n <= _size); // 取走连续的链表空间 start = _freeList; // 起始地址 end = start; // 末尾地址 for (size_t i = 0; i < n - 1; ++i) { end = NextObj(end); } _freeList = NextObj(end); // 剩余空间继续挂接 NextObj(end) = nullptr; _size -= n; }
5.2central cache 释放回来时 use_count-=1。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
// 将一定数量的对象释放到span跨度(CentralCache) void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t index = SizeClass::Index(size); // 确定桶的位置 _spanLists[index]._mtx.lock(); // 上桶锁 while (start) { void* next = NextObj(start); Span* span = PageCache::GetInstance()->MapObjectToSpan(start); // 通过映射找到span的位置 NextObj(start) = span->_freeList; // 头插 span->_freeList = start; span->_useCount -= 1; // 内存块借出去个数 if (span->_useCount == 0) // 说明小块内存全回来了; 这个span就可以回收合并了 { _spanLists[index].Erase(span); span->_freeList = nullptr; span->_next = nullptr; span->_prev = nullptr; //释放span给page cache时,使用page cache的锁就可以了 //这时把桶锁解掉 _spanLists[index]._mtx.unlock(); // 解锁 PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); _spanLists[index]._mtx.lock(); // 上桶锁 } start = next; } _spanLists[index]._mtx.unlock(); // 解锁 }
// 获取从对象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); std::unique_lock<std::mutex> lock(_pageMtx); //RAII风格的锁,把pagecache的锁给这个类。 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } return nullptr; }
5.3page cache 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
// 释放空闲span回到Pagecache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { // 向前合并 while (true) { // 对span前的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID prevId = span->_pageId - 1; // 前面一页 auto ret = _idSpanMap.find(prevId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //前面的页号没有;不合并了 } Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; // span正在使用;不能合并 } if (prevSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往前合并了 span->_pageId = prevSpan->_pageId; // 起始地址往前 span->_n += prevSpan->_n; // 页数增加了 _spanLists[prevSpan->_n].Erase(prevSpan); // 合并后,取消挂节;防止野指针出现 delete prevSpan; // 合并后;处理掉前面相邻的span } // 向后合并 while (true) { // 对span后的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID backId = span->_pageId + span->_n; // 后面span的一页 auto ret = _idSpanMap.find(backId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //后面的页号没有;不合并了 } Span* backSpan = ret->second; if (backSpan->_isUse == true) { break; // span正在使用;不能合并 } if (backSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往后合并了 span->_n += backSpan->_n; // 页数增加了 _spanLists[backSpan->_n].Erase(backSpan); // 合并后,取消挂节;防止野指针出现 delete backSpan; // 合并后;处理掉后面相邻的span } // 挂节到PageCache _spanLists[span->_n].PushFront(span); span->_isUse = false; // 记录page中头和尾内存的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; }
用定长内存池替代new和delete提高效率
我们之前每申请/释放一个Span结点就需要new/delete一个
解决方案:把之前写的定长内存池利用上;定义在pagecache中声明。
高并发内存池-大于256KB的大块内存申请与释放问题
1)概述
申请流程
ConcurrentAlloc函数完善
static void* ConcurrentAlloc(size_t size) { if (size > MAX_BYTES) // 申请内存大于32页直接去PageCache申请内存。 { size_t alignSize = SizeClass::RoundUp(size); // 内存大小对齐数 size_t kPage = alignSize >> PAGE_SHIFT; // 算出页数 Span* span = PageCache::GetInstance()->NewSpan(kPage); span->_objSize = size; void* ptr = (void*)(span->_pageId << PAGE_SHIFT); // 页内存的起始地址 return ptr; } else { // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { static myObject< ThreadCache> tcPool; //pTLSThreadCache = new ThreadCache; pTLSThreadCache = tcPool.New(); } //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } }
2)NewSpan函数完善
// 获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0); if (k > NPAGES - 1) // 大于128页去堆上申请 { void* ptr = SystemAlloc(k); //Span* span = new Span; Span* span = _spanPool.New(); // 定长内存池,代替new;提高效率 span->_pageId = ((PAGE_ID)ptr >> PAGE_SHIFT); span->_n = k; _idSpanMap[span->_pageId] = span; // 起始页号和span映射 return span; } // 先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { Span* kSpan= _spanLists[k].PopFront(); // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } // 检查一下后面的桶里面有没有span, 如果有可以把他它进行切分 for (size_t i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].Empty()) { // 进行切分 Span* nSpan = _spanLists[i].PopFront(); // i:位置取出它头结点 //Span* kSpan = new Span; // 创k一个结点 Span* kSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 kSpan->_pageId = nSpan->_pageId; // 页的起始位置 nSpan->_pageId += k; // 分出k页;n页位置向后走(相当与一大块内存取出头一小块内存) kSpan->_n = k; // 得到k页 nSpan->_n -= k; // 剩余空间; 挂接到nSpan->_n即(i-k)页的位置 _spanLists[nSpan->_n].PushFront(nSpan); // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时 //进行的合并查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页) // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到这个位置就说明后面没有大页的span了 //这时就去找堆要一个128页(最大页---NPAGES-1)的span //Span* bigSpan = new Span; Span* bigSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 地址除以13;就是页号 bigSpan->_n = NPAGES - 1; // 内存挂接到相应位置 _spanLists[bigSpan->_n].PushFront(bigSpan); // 函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响 return NewSpan(k); }
3)释放流程
ConcurrentFree函数完善
我们要给一个内存块函数自己自动推演内存大小;并进行内存释放和回收。
解决方案:
a. 通过hash映射法(size与span)
b. span结构中增加一个成员变量size
我们采用b方案;在GetOneSpan函数内来添加大小
static void ConcurrentFree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); // 通过地址,找到span的映射位置 size_t size = span->_objSize; if (size > MAX_BYTES) { PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
ReleaseSpanToPageCache函数完善
// 释放空闲span回到Pagecache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { if (span->_n > NPAGES - 1) // 大于128 页直接向堆释放掉 { void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //通过页号起始,算出起始页的地址 SystemFree(ptr); // 释放 // delete span; _spanPool.Delete(span); // 定长内存池代替delete;提高效率 return; } // 向前合并 while (true) { // 对span前的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID prevId = span->_pageId - 1; // 前面一页 auto ret = _idSpanMap.find(prevId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //前面的页号没有;不合并了 } Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; // span正在使用;不能合并 } if (prevSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往前合并了 span->_pageId = prevSpan->_pageId; // 起始地址往前 span->_n += prevSpan->_n; // 页数增加了 _spanLists[prevSpan->_n].Erase(prevSpan); // 合并后,取消挂节;防止野指针出现 //delete prevSpan; // 合并后;处理掉前面相邻的span _spanPool.Delete(prevSpan); // 定长内存池代替delete;提高效率 } // 向后合并 while (true) { // 对span后的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID backId = span->_pageId + span->_n; // 后面span的一页 auto ret = _idSpanMap.find(backId); // 查找是否存在 // 查找前后的相邻页尝试合并 if (ret == _idSpanMap.end()) { break; //后面的页号没有;不合并了 } Span* backSpan = ret->second; if (backSpan->_isUse == true) { break; // span正在使用;不能合并 } if (backSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往后合并了 span->_n += backSpan->_n; // 页数增加了 _spanLists[backSpan->_n].Erase(backSpan); // 合并后,取消挂节;防止野指针出现 //delete backSpan; // 合并后;处理掉后面相邻的span _spanPool.Delete(backSpan); // 定长内存池代替delete;提高效率 } // 挂节到PageCache _spanLists[span->_n].PushFront(span); span->_isUse = false; // 记录page中头和尾内存的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; }
高并发内存池-针对性能瓶颈使用基数树进行优化
1)概述
通过性能检测,我们明显发现;内存池性能很差???
通过性能检测;我们发现我们映射中的lock(加锁)消耗性能(映射加锁是为了保证多线程安全)
三层空间
不同平台下,存储页号需要位数不同
一层 本质是id与span*的映射
使用tcmalloc源码中实现基数树进行优化
利用hash映射的部分全部改成基树(读写分离)
#pragma once #include"Common.h" // Single-level array template <int BITS> class TCMalloc_PageMap1 { private: static const int LENGTH = 1 << BITS; void** array_; public: typedef uintptr_t Number; //explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) { explicit TCMalloc_PageMap1() { //array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS)); size_t size = sizeof(void*) << BITS; size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); memset(array_, 0, sizeof(void*) << BITS); } // Return the current value for KEY. Returns NULL if not yet set, // or if k is out of range. void* get(Number k) const { if ((k >> BITS) > 0) { return NULL; } return array_[k]; } // REQUIRES "k" is in range "[0,2^BITS-1]". // REQUIRES "k" has been ensured before. // // Sets the value 'v' for key 'k'. void set(Number k, void* v) { array_[k] = v; } }; // Two-level radix tree template <int BITS> class TCMalloc_PageMap2 { private: // Put 32 entries in the root and (2^BITS)/32 entries in each leaf. static const int ROOT_BITS = 5; static const int ROOT_LENGTH = 1 << ROOT_BITS; static const int LEAF_BITS = BITS - ROOT_BITS; static const int LEAF_LENGTH = 1 << LEAF_BITS; // Leaf node struct Leaf { void* values[LEAF_LENGTH]; }; Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes void* (*allocator_)(size_t); // Memory allocator public: typedef uintptr_t Number; //explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) { explicit TCMalloc_PageMap2() { //allocator_ = allocator; memset(root_, 0, sizeof(root_)); PreallocateMoreMemory(); } void* get(Number k) const { const Number i1 = k >> LEAF_BITS; const Number i2 = k & (LEAF_LENGTH - 1); if ((k >> BITS) > 0 || root_[i1] == NULL) { return NULL; } return root_[i1]->values[i2]; } void set(Number k, void* v) { const Number i1 = k >> LEAF_BITS; const Number i2 = k & (LEAF_LENGTH - 1); assert(i1 < ROOT_LENGTH); root_[i1]->values[i2] = v; } bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> LEAF_BITS; // Check for overflow if (i1 >= ROOT_LENGTH) return false; // Make 2nd level node if necessary if (root_[i1] == NULL) { //Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf))); //if (leaf == NULL) return false; static myObject<Leaf> leafPool; Leaf* leaf = (Leaf*)leafPool.New(); memset(leaf, 0, sizeof(*leaf)); root_[i1] = leaf; } // Advance key past whatever is covered by this leaf node key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; } return true; } void PreallocateMoreMemory() { // Allocate enough to keep track of all possible pages Ensure(0, 1 << BITS); } }; // Three-level radix tree template <int BITS> class TCMalloc_PageMap3 { private: // How many bits should we consume at each interior level static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; // How many bits should we consume at leaf level static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; static const int LEAF_LENGTH = 1 << LEAF_BITS; // Interior node struct Node { Node* ptrs[INTERIOR_LENGTH]; }; // Leaf node struct Leaf { void* values[LEAF_LENGTH]; }; Node* root_; // Root of radix tree void* (*allocator_)(size_t); // Memory allocator Node* NewNode() { Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node))); if (result != NULL) { memset(result, 0, sizeof(*result)); } return result; } public: typedef uintptr_t Number; explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) { allocator_ = allocator; root_ = NewNode(); } void* get(Number k) const { const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); const Number i3 = k & (LEAF_LENGTH - 1); if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) { return NULL; } return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; } void set(Number k, void* v) { ASSERT(k >> BITS == 0); const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); const Number i3 = k & (LEAF_LENGTH - 1); reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; } bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); // Check for overflow if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) return false; // Make 2nd level node if necessary if (root_->ptrs[i1] == NULL) { Node* n = NewNode(); if (n == NULL) return false; root_->ptrs[i1] = n; } // Make leaf node if necessary if (root_->ptrs[i1]->ptrs[i2] == NULL) { Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf))); if (leaf == NULL) return false; memset(leaf, 0, sizeof(*leaf)); root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf); } // Advance key past whatever is covered by this leaf node key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; } return true; } void PreallocateMoreMemory() { } };
基树与hash对比
1、只有在这两个函数中回去建立id和span的映射,也就是说回去写 2、基数树,写之前会提前开好空间,写数据过程中,不会动结构。 3、读写是分离的。线程1对一个位置读写的时候,线程2不可能对这个位置读写
5.4框架代码和代码总结 common.h共享文件
#pragma once #include <iostream> #include <vector> #include <thread> #include <algorithm> #include <mutex> #include <unordered_map> #include <time.h> #include <assert.h> using std::cout; using std::endl; static const size_t MAX_BYTES = 256 * 1024; // 一页8kb; 32页 static const size_t NFREELIST = 208; // central and thread总共桶的个数 static const size_t NPAGES = 129; // page总共桶的个数; 数组下标从1开始总共128页 static const size_t PAGE_SHIFT = 13; // 一页字节数2^13 // 不同平台下页数不同;可能导致存页数变量范围不够 #ifdef _WIN64 typedef unsigned long long PAGE_ID; #elif _WIN32 typedef size_t PAGE_ID; #else // Linux #endif #ifdef _WIN32 #include<windows.h> #else // linux #endif // 直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; } // 释放内存 inline static void SystemFree(void* ptr) { #ifdef _WIN32 VirtualFree(ptr, 0, MEM_RELEASE); #else // sbrk unmmap等 #endif } static void*& NextObj(void* obj) // 前四个或八个字节(下一个结点的地址) { return *(void**)obj; } // 管理小对象的自由链表 class FreeList { public: void Push(void* obj) { // 头插 NextObj(obj) = _freeList; _freeList = obj; ++_size; } void* Pop() { assert(_freeList); // 头删 void* obj = _freeList; _freeList = NextObj(obj); --_size; return obj; } void PushRange(void* start, void* end, size_t n) { // 一连串空间存储 NextObj(end) = _freeList; _freeList = start; _size += n; } void PopRange(void*& start, void*& end, size_t n) { assert(n <= _size); // 取走连续的链表空间 start = _freeList; // 起始地址 end = start; // 末尾地址 for (size_t i = 0; i < n - 1; ++i) { end = NextObj(end); } _freeList = NextObj(end); // 剩余空间继续挂接 NextObj(end) = nullptr; _size -= n; } bool Empty() { return _freeList == nullptr; } size_t& MaxSize() { return _maxSize; } size_t Size() { return _size; } private: void* _freeList = nullptr; size_t _maxSize = 1; // 表示向CentrealCache要空间数量的上限 size_t _size = 0; // 记录(_freeList)挂接内存的个数 }; // 计算对象大小的对齐映射规则 struct SizeClass { // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐 freelist[0,16) // [128+1,1024] 16byte对齐 freelist[16,72) // [1024+1,8*1024] 128byte对齐 freelist[72,128) // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) static inline size_t _RoundUp(size_t bytes, size_t alignNum) // 算出桶的大小(即size的对齐数) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); } // 对齐大小计算 static inline size_t RoundUp(size_t size) { // 确定每个区间的对齐数 if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 8 * 1024) { return _RoundUp(size, 128); } else if (size <= 64 * 1024) { return _RoundUp(size, 1024); } else if (size <= 256 * 1024) { return _RoundUp(size, 8 * 1024); } else { return _RoundUp(size, 8 * 1024); } } static inline size_t _Index(size_t bytes, size_t align_shift) { return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; // 桶下标从0开始计数 } // 计算映射的哪一个自由链表桶 static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); // 每个区间有多少个链 static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else { assert(false); } return -1; } // 一次从中心缓存获取多少个 static size_t NumMoveSize(size_t size) { assert(size > 0); // [2, 512],一次批量移动多少个对象的(慢启动)上限值 // 小对象一次批量上限高 // 小对象一次批量上限低 int num = MAX_BYTES / size; if (num < 2) num = 2; if (num > 512) num = 512; return num; } // 计算一次向系统获取几个页 // 单个对象 8byte // ... // 单个对象 256KB static size_t NumMovePage(size_t size) { size_t num = NumMoveSize(size); // 要几页 size_t npage = num * size; //总字节数 npage >>= PAGE_SHIFT; // 计算多少页,一页8k(2^13);则PAGE_SHIFT为13 if (npage == 0) npage = 1; return npage; } }; // Span管理一个跨度的大块内存 // 管理以页为单位的大块内存 struct Span { PAGE_ID _pageId = 0; // 页号 size_t _n = 0; // 页数 // 双向链表的结构 Span* _next = nullptr; Span* _prev = nullptr; size_t _objSize = 0; // 切好的小对象大小 size_t _useCount = 0; // 使用计数,切好的小块内存分配给ThreadCache void* _freeList = nullptr; // 大块内存切小链接起来,这样回收回来的内存也方便链接 bool _isUse = false; // 是否在被使用 }; // 带头双向循环链表 class SpanList { public: SpanList() { _head = new Span; _head->_next = _head; _head->_prev = _head; } Span* Begin() { return _head->_next; } Span* End() { return _head; } bool Empty() { return _head->_next == _head; } void PushFront(Span* span) { Insert(Begin(), span); } Span* PopFront() { Span* front = _head->_next; Erase(front); return front; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); // 头插 Span* prev = pos->_prev; prev->_next = newSpan; newSpan->_prev = prev; newSpan->_next = pos; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos); assert(pos != _head); Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; } private: Span* _head; public: std::mutex _mtx; // 桶锁: 数组_spanLists中每一个位置的桶都有一把锁 };
六、高并发内存池三大框架代码 6.1threadcache框架(声明) #pragma once #include "common.h" // thread cache本质是由一个哈希映射的对象自由链表构成 class ThreadCache { public: // 申请和释放内存对象 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size); // 从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); // 释放对象时,链表过长时,回收内存回到中心缓存 void ListTooLong(FreeList& list, size_t size); private: FreeList _freeLists[NFREELIST]; }; // TLS thread local storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
6.2CentralCache框架(声明) #pragma once #include "common.h" // 单例模式---饿汉模式 class CentralCache { public: static CentralCache* GetInstance() { return &_sInst; } // 获取一个非空的span Span* GetOneSpan(SpanList& list, size_t byte_size); // list: 表示_spanLists中span的_freeList连续挂接空间的数量 // 从中心缓存获取一定数量的对象给thread cache size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); // size:内存空间大小;batchNum:内存空间个数 // start 和 end 表示内存空间地址起始到终末范围 // 将一定数量的对象释放到span跨度 void ReleaseListToSpans(void* start, size_t size); private: SpanList _spanLists[NFREELIST]; private: CentralCache() {}; CentralCache(const CentralCache& t) = delete; static CentralCache _sInst; };
6.3pagecache框架(声明) #pragma once #include "common.h" #include "ObjectPool.h" #include "PageMap.h" class PageCache { public: static PageCache* GetInstance() { return &_sInst; } // 获取一个k页的span Span* NewSpan(size_t k); // 获取从对象到span的映射 Span* MapObjectToSpan(void* obj); // 释放空闲span回到Pagecache,并合并相邻的span void ReleaseSpanToPageCache(Span* span); std::mutex _pageMtx; private: SpanList _spanLists[NPAGES]; PageCache() {}; PageCache(const PageCache& t) = delete; myObject<Span> _spanPool; //std::unordered_map<PAGE_ID, Span*> _idSpanMap; TCMalloc_PageMap2<32 - PAGE_SHIFT>_idSpanMap; // 基树 static PageCache _sInst; };
6.4threadcache框架(定义) #pragma once #include "ThreadCache.h" #include "CentralCache.h" // 慢开始反馈调节算法 // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完 // 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限 // 3、size越大,一次向central cache要的batchNum就越小 // 4、size越小,一次向central cache要的batchNum就越大 void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { // 慢开始反馈调节法 不会开始要太多内存 size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { _freeLists[index].MaxSize() += 1; } void* start = nullptr; void* end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size); // 获取连续实际内存的数目 assert(actualNum >= 1); // 至少一个 if (actualNum == 1) { assert(start == end); } else { _freeLists[index].PushRange(NextObj(start), end, actualNum - 1); } return start; } // 申请内存对象, 空间先到_freeLists链表中去取,如果没有去中心缓存获取空间 void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size); size_t index = SizeClass::Index(size); if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize); } } // 释放内存对象,空间挂起到_freeLists链表中去 void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); // 找对映射的自由链表桶,对象插入进入 size_t index = SizeClass::Index(size); _freeLists[index].Push(ptr); //当链表长度大于一次批量申请的内存时就开始还一段list给central cache if (_freeLists[index].Size() >= _freeLists[index].MaxSize()) { // 释放对象时,链表过长时,回收内存回到中心缓存 ListTooLong(_freeLists[index], size); } } void ThreadCache::ListTooLong(FreeList& list, size_t size) { void* start = nullptr; void* end = nullptr; list.PopRange(start, end, list.MaxSize()); //取消挂接。 // 将一定数量的对象释放到span跨度(CentralCache) CentralCache::GetInstance()->ReleaseListToSpans(start, size); }
6.5CentralCache框架(定义) #pragma once #include"CentralCache.h" #include"PageCache.h" CentralCache CentralCache::_sInst; //定义 // 获取一个非空的span Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size) { // 先从桶的span中找内存(查看当前的spanlist中是否有还有未分配对象的span) Span* it = list.Begin(); while (it != list.End()) { if (it->_freeList != nullptr) { return it; } else { it = it->_next; } } // 先把central 桶锁解除:这样如果某他线程释放内存对象回来,不会阻塞 list._mtx.unlock(); // 访问page时 需要加锁 PageCache::GetInstance()->_pageMtx.lock(); // 没有空闲的span去PageCache中要 Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size)); // 获取一个k页的span span->_isUse = true; // 内存span已经使用了 span->_objSize = byte_size; // 切分内存大小记录下来;便于改良回收 PageCache::GetInstance()->_pageMtx.unlock(); //对获取span进行切分,不需要加锁,因为其他线程拿不到这个span char* start = (char*)(span->_pageId << PAGE_SHIFT); // k页的span的起始地址 size_t bytes = span->_n << PAGE_SHIFT; // 内存字节数 char* end = start + bytes; // 把大块内存切成自由链表链表 // 1. 先切一块作头,然后尾插(这样保证物理空间顺序与逻辑顺序保持一致) span->_freeList = start; start += byte_size; void* tail = span->_freeList; while (start < end) { NextObj(tail) = start; tail = NextObj(tail); start += byte_size; } NextObj(tail) = nullptr; // 最后一个内存块下个内存地址置空 // 挂节的时候恢复cengtral的锁 list._mtx.lock(); list.PushFront(span); return span; } // 从中心缓存获取一定数量的对象给thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) { // 每个桶加锁 size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); Span* span = GetOneSpan(_spanLists[index], size); assert(span); assert(span->_freeList); // 从span中获取batchNum个对象 // 如果不够batchNum个,有多少拿多少 start = span->_freeList; end = start; size_t i = 0, actualNum = 1; while (i < batchNum - 1 && NextObj(end) != nullptr) // 取连续内存块,当batchNum超过原有时,按拥有的数量给 { end = NextObj(end); ++i; ++actualNum; } span->_freeList = NextObj(end); // 剩余空间仍挂在Span中的_freeList去 NextObj(end) = nullptr; // 最后end记录下一个空间地址置空 span->_useCount += actualNum; _spanLists[index]._mtx.unlock(); // 解锁 return actualNum; } void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t index = SizeClass::Index(size); // 确定桶的位置 _spanLists[index]._mtx.lock(); // 上桶锁 while (start) { void* next = NextObj(start); Span* span = PageCache::GetInstance()->MapObjectToSpan(start); // 通过映射找到span的位置 NextObj(start) = span->_freeList; // 头插 span->_freeList = start; span->_useCount -= 1; // 内存块借出去个数 if (span->_useCount == 0) // 说明小块内存全回来了; 这个span就可以回收合并了 { _spanLists[index].Erase(span); span->_freeList = nullptr; span->_next = nullptr; span->_prev = nullptr; //释放span给page cache时,使用page cache的锁就可以了 //这时把桶锁解掉 _spanLists[index]._mtx.unlock(); // 解锁 PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); _spanLists[index]._mtx.lock(); // 上桶锁 } start = next; } _spanLists[index]._mtx.unlock(); // 解锁 }
6.6pagecache框架(定义) #pragma once #include "PageCache.h" PageCache PageCache::_sInst; // 获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0); if (k > NPAGES - 1) // 大于128页去堆上申请 { void* ptr = SystemAlloc(k); //Span* span = new Span; Span* span = _spanPool.New(); // 定长内存池,代替new;提高效率 span->_pageId = ((PAGE_ID)ptr >> PAGE_SHIFT); span->_n = k; //_idSpanMap[span->_pageId] = span; // 起始页号和span映射 _idSpanMap.set(span->_pageId, span); return span; } // 先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { Span* kSpan= _spanLists[k].PopFront(); // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { //_idSpanMap[kSpan->_pageId + i] = kSpan; _idSpanMap.set(kSpan->_pageId + i, kSpan); } return kSpan; } // 检查一下后面的桶里面有没有span, 如果有可以把他它进行切分 for (size_t i = k + 1; i < NPAGES; ++i) { if (!_spanLists[i].Empty()) { // 进行切分 Span* nSpan = _spanLists[i].PopFront(); // i:位置取出它头结点 //Span* kSpan = new Span; // 创k一个结点 Span* kSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 kSpan->_pageId = nSpan->_pageId; // 页的起始位置 nSpan->_pageId += k; // 分出k页;n页位置向后走(相当与一大块内存取出头一小块内存) kSpan->_n = k; // 得到k页 nSpan->_n -= k; // 剩余空间; 挂接到nSpan->_n即(i-k)页的位置 _spanLists[nSpan->_n].PushFront(nSpan); // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时 //进行的合并查找 //_idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap.set(nSpan->_pageId, nSpan); //_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; // 例如:20 ~24(5页) _idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan); // 建立id和span的映射; 方便回收小块内存 for (size_t i = 0; i < kSpan->_n; ++i) { //_idSpanMap[kSpan->_pageId + i] = kSpan; _idSpanMap.set(kSpan->_pageId + i, kSpan); } return kSpan; } } //走到这个位置就说明后面没有大页的span了 //这时就去找堆要一个128页(最大页---NPAGES-1)的span //Span* bigSpan = new Span; Span* bigSpan = _spanPool.New(); // 定长内存池,代替new;提高效率 void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 地址除以13;就是页号 bigSpan->_n = NPAGES - 1; // 内存挂接到相应位置 _spanLists[bigSpan->_n].PushFront(bigSpan); // 函数递归调用自己;减少代码冗余(上面的空间切分)在时间稍微影响,基本上没影响 return NewSpan(k); } // 获取从对象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); //std::unique_lock<std::mutex> lock(_pageMtx); //RAII风格的锁,把pagecache的锁给这个类。 //auto ret = _idSpanMap.find(id); //if (ret != _idSpanMap.end()) //{ // return ret->second; //} //else //{ // assert(false); // return nullptr; //} Span* ret = (Span*)_idSpanMap.get(id); assert(ret != nullptr); return ret; } // 释放空闲span回到Pagecache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { if (span->_n > NPAGES - 1) // 大于128 页直接向堆释放掉 { void* ptr = (void*)(span->_pageId << PAGE_SHIFT); //通过页号起始,算出起始页的地址 SystemFree(ptr); // 释放 // delete span; _spanPool.Delete(span); // 定长内存池代替delete;提高效率 return; } // 向前合并 while (true) { // 对span前的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID prevId = span->_pageId - 1; // 前面一页 //auto ret = _idSpanMap.find(prevId); // 查找是否存在 // 查找前后的相邻页尝试合并 //if (ret == _idSpanMap.end()) //{ // break; //前面的页号没有;不合并了 //} Span* ret = (Span*)_idSpanMap.get(prevId); if (ret == nullptr) { break; //前面的页号没有;不合并了 } //Span* prevSpan = ret->second; Span* prevSpan = ret; if (prevSpan->_isUse == true) { break; // span正在使用;不能合并 } if (prevSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往前合并了 span->_pageId = prevSpan->_pageId; // 起始地址往前 span->_n += prevSpan->_n; // 页数增加了 _spanLists[prevSpan->_n].Erase(prevSpan); // 合并后,取消挂节;防止野指针出现 //delete prevSpan; // 合并后;处理掉前面相邻的span _spanPool.Delete(prevSpan); // 定长内存池代替delete;提高效率 } // 向后合并 while (true) { // 对span后的页尝试进行合并.缓解内存碎片(外碎片)问题 PAGE_ID backId = span->_pageId + span->_n; // 后面span的一页 //auto ret = _idSpanMap.find(backId); // 查找是否存在 // 查找前后的相邻页尝试合并 //if (ret == _idSpanMap.end()) //{ // break; //后面的页号没有;不合并了 //} Span* ret = (Span*)_idSpanMap.get(backId); if (ret == nullptr) { break; //后面的页号没有;不合并了 } Span* backSpan = ret; if (backSpan->_isUse == true) { break; // span正在使用;不能合并 } if (backSpan->_n + span->_n > NPAGES - 1) { break; // 内存超过_spanLists数组最大存储; 超过桶的存储能力(没办法管理);不能存储了 } // 可以往后合并了 span->_n += backSpan->_n; // 页数增加了 _spanLists[backSpan->_n].Erase(backSpan); // 合并后,取消挂节;防止野指针出现 //delete backSpan; // 合并后;处理掉后面相邻的span _spanPool.Delete(backSpan); // 定长内存池代替delete;提高效率 } // 挂节到PageCache _spanLists[span->_n].PushFront(span); span->_isUse = false; // 记录page中头和尾内存的映射 //_idSpanMap[span->_pageId] = span; //_idSpanMap[span->_pageId + span->_n - 1] = span; _idSpanMap.set(span->_pageId, span); _idSpanMap.set(span->_pageId + span->_n - 1, span); }
定长内存池
(ObjectPool.h文件) 作用替代高并发内存池中的new和delete
#pragma once #include"common.h" template <class T> class myObject { public: T* New() { // 优先把还回来内存块对象,再次重复利用 T* obj = nullptr; if (_freeList) { // 头删 void* next= *(void**)_freeList; obj = (T*)_freeList; _freeList = next; } else { // 剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = 1024 * 128; //_memory = (char*)malloc(_remainBytes); _memory = (char*)SystemAlloc(_remainBytes >> 13); // 按页申请内存 if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //申请内存空间必须大于或等于一个指针的空间 _memory += objSize; _remainBytes -= objSize; } // 定位new,显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { // 显示调用析构函数清理对象 obj->~T(); // 头插 *(void**)obj = _freeList; // 取前四或八个字节 _freeList = obj; } private: char* _memory = nullptr; // 指向大块内存的指针 size_t _remainBytes = 0; // 大块内存分割过程中剩余的字节数 void* _freeList = nullptr; // 还回来的内存块用自由链接表的头指针链接起来 };
线程局部存储 (ConcurrentAlloc.h)文件
#pragma once #include "Common.h" #include "ThreadCache.h" #include "PageCache.h" #include "ObjectPool.h" static void* ConcurrentAlloc(size_t size) { if (size > MAX_BYTES) // 申请内存大于32页直接去PageCache申请内存。 { size_t alignSize = SizeClass::RoundUp(size); // 内存大小对齐数 size_t kPage = alignSize >> PAGE_SHIFT; // 算出页数 Span* span = PageCache::GetInstance()->NewSpan(kPage); span->_objSize = size; void* ptr = (void*)(span->_pageId << PAGE_SHIFT); // 页内存的起始地址 return ptr; } else { // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { static myObject< ThreadCache> tcPool; //pTLSThreadCache = new ThreadCache; pTLSThreadCache = tcPool.New(); } //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } } static void ConcurrentFree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); // 通过地址,找到span的映射位置 size_t size = span->_objSize; if (size > MAX_BYTES) { PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 释放空闲span回到Pagecache,并合并相邻的span PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
测试性能(系统的malloc与我们写的高并发内存池) (Benchmark.cpp文件) 注意:
测试代码中出现了lamda
测试代码借鉴某大佬的(^ v ^)
#include "ConcurrentAlloc.h" // nworks:创建线程的次数 // rounds:经历多少轮次 // ntimes:一轮申请释放的次数 void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&, k]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { //v.push_back(malloc(16)); v.push_back(malloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { free(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl; cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << free_costtime << "ms" << endl; cout << nworks << "个线程并发malloc & free" << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl; } // 单轮次申请释放次数 线程数 轮次 void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { //v.push_back(ConcurrentAlloc(16)); v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { ConcurrentFree(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl; cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc" << ntimes << "次: 花费:" << free_costtime << "ms" << endl; cout << nworks << "个线程并发malloc & free" << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << "ms" << endl; } int main() { size_t n = 1000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); // 自己写的内存池 cout << endl << endl; BenchmarkMalloc(n, 4, 10); // 系统malloc cout << "==========================================================" << endl; return 0; }
七、性能测试结果图 明显效率大大滴提高了
当前项目实现的不足
不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持
alias attribute,所以替换就变成了这种通用形式。