线程安全的无锁内存池:自由函数在多线程中的行为不正确
Thread-safe lockless memory pool: free function does not behave correctly in multi-thread
我有一个相同大小缓冲区的线程安全分配器的简单实现。
在内部,实现是一个非常简单的互锁单链表,它利用未分配缓冲区中未使用的 space 来维护单链表。
还写了一些测试,在单线程模式下测试代码 - 一切似乎都很好。
设法将问题隔离到 Free 函数,但我似乎找不到它。
我不得不提一下,我 运行 使用 Microsoft's Interlocked Singly Linked Lists 使用完全相同的代码进行了一些测试,显然它有效,但我仍然想找出我的实现有什么问题.
甚至尝试反汇编代码并应用类似的内在函数,但它没有帮助(还要注意,我不需要跟踪列表条目的数量,所以这就是为什么我不需要互锁功能来交换双寄存器-size 元素,如 InterlockedCompareExchange128 for x64)
这是分配器的代码:
#ifndef _POOLNOLOCK_HPP_
#define _POOLNOLOCK_HPP_
#include <windows.h>
template<size_t TSizeOfElem>
class PoolNoLock {
public:
PoolNoLock(size_t N) :
n(N),
arr(new ELEMENT[n])
{
for (size_t i = 0; (n - 1) > i; ++i)
{
arr[i].next = &arr[i + 1];
}
arr[n - 1].next = nullptr;
for (size_t i = 0; n > i; ++i)
{
arr[i].allocated = false;
}
}
~PoolNoLock() { delete[] arr; }
void *Alloc()
{
ELEMENT *allocBuff;
do
{
allocBuff = ptrFree;
if (!allocBuff)
{
return nullptr;
}
} while (allocBuff != InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next,
allocBuff
));
if (allocBuff->allocated)
{
__debugbreak(); //will break here
}
allocBuff->allocated = true;
return &allocBuff->buff;
}
void Free(void *Address)
{
ELEMENT *const freeBuff = reinterpret_cast<ELEMENT *>(Address);
if (!freeBuff->allocated)
{
__debugbreak();
}
freeBuff->allocated = false;
ELEMENT *cmpFree = ptrFree;
do
{
freeBuff->next = cmpFree;
ELEMENT *const xchgFree =
reinterpret_cast<ELEMENT *>(InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
freeBuff,
cmpFree
));
if (xchgFree == cmpFree)
{
break;
}
cmpFree = xchgFree;
} while (true);
}
private:
typedef struct _ELEMENT {
union {
_ELEMENT *next;
unsigned char buff[TSizeOfElem];
};
bool allocated; //debug info
}ELEMENT;
const size_t n;
ELEMENT *const arr; //array of list elements
ELEMENT *volatile ptrFree = &arr[0]; //head of "singly" linked list
};
#endif // _POOLNOLOCK_HPP_
这是我用来对class进行压力测试的代码:
- 64 是对象的最大数量,WaitForMultipleObjects 可以等待
- 需要线程中的等待来帮助实现尽可能多的线程正在访问资源的场景
- 生成的线程数正好等于分配器中的元素数,这就是 alloc-only 测试有效的原因
#include "PoolNoLock.hpp"
#include <vector>
#include <map>
#include <iostream>
static constexpr size_t N_THREAD = 64;
static constexpr size_t N_TEST_RUN = 4;
static constexpr size_t N_ALLOC_FREE = 1024;
struct ThreadParam {
PoolNoLock<sizeof(size_t)> *allocator;
const HANDLE &hStartEvent;
void *addressAlloc = nullptr;
ThreadParam(PoolNoLock<sizeof(size_t)> *Allocator, const HANDLE &StartEvent) :
allocator(Allocator),
hStartEvent(StartEvent)
{};
};
template<bool TAllocOnly>
class Test {
public:
~Test()
{
CloseHandle(hStartEvent);
}
bool RunSingle(PoolNoLock<sizeof(size_t)> *Allocator)
{
std::vector<ThreadParam> params(N_THREAD, ThreadParam(Allocator, hStartEvent));
if (TRUE != ResetEvent(hStartEvent))
{
return false;
}
for (size_t i = 0; N_THREAD != i; ++i)
{
handles[i] = CreateThread(nullptr,
0,
reinterpret_cast<PTHREAD_START_ROUTINE>(threadProc),
¶ms[i],
CREATE_SUSPENDED,
&tids[i]);
if (!handles[i])
{
return false;
}
}
for (HANDLE handle : handles)
{
if (1 != ResumeThread(handle))
{
return false;
}
}
if (TRUE != SetEvent(hStartEvent))
{
return false;
}
if ((WAIT_OBJECT_0 + N_THREAD - 1) < WaitForMultipleObjects(N_THREAD, handles, TRUE, INFINITE))
{
return false;
}
for (size_t i = 0; N_THREAD != i; ++i)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(handles[i], 0))
{
return false;
}
DWORD exitCode;
if (TRUE != GetExitCodeThread(handles[i], &exitCode))
{
return false;
}
if (0 != exitCode)
{
return false;
}
if (TRUE != CloseHandle(handles[i]))
{
return false;
}
}
if (TAllocOnly)
{
std::map<void *, DWORD> threadAllocations;
for (size_t i = 0; N_THREAD != i; ++i)
{
if (!params[i].addressAlloc)
{
return false;
}
if (threadAllocations.end() == threadAllocations.find(params[i].addressAlloc))
{
return false;
}
std::pair<std::map<void *, DWORD>::iterator, bool> res =
threadAllocations.insert(std::make_pair(params[i].addressAlloc, tids[i]));
if (!res.second)
{
return false;
}
Allocator->Free(params[i].addressAlloc);
}
if (N_THREAD != threadAllocations.size())
{
return false;
}
}
return false;
}
bool RunMultiple()
{
for (size_t i = 0; N_TEST_RUN != i; ++i)
{
PoolNoLock<sizeof(size_t)> allocator(N_THREAD);
RunSingle(&allocator);
}
return true;
}
private:
const HANDLE hStartEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
HANDLE handles[N_THREAD] = { nullptr };
DWORD tids[N_THREAD] = { 0 };
static DWORD WINAPI ThreadProcAllocOnly(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}
Param->addressAlloc = Param->allocator->Alloc();
if (!Param->addressAlloc)
{
return 3;
}
return 0;
}
static DWORD WINAPI ThreadProcAllocFree(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}
for (size_t i = 0; N_ALLOC_FREE != i; ++i)
{
void *ptrTest = Param->allocator->Alloc();
if (!ptrTest)
{
return 3;
}
Param->allocator->Free(ptrTest);
}
return 0;
}
const LPTHREAD_START_ROUTINE threadProc =
TAllocOnly
? reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocOnly)
: reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocFree);
};
int main()
{
Test<true> testAllocOnly0;
Test<false> TestAllocFree0;
if (!testAllocOnly0.RunMultiple()) //this test will succeed
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc-ONLY tests succeeded" << std::endl;
if (!TestAllocFree0.RunMultiple()) //this test will fail
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc/free tests succeeded" << std::endl;
std::cout << "All tests succeeded" << std::endl;
return 0;
}
你在 Alloc()
程序中的错误。更符合
InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next, // <-- !!!
allocBuff)
这里有 2 个操作:首先 cpu 从 allocBuff
指针读取 allocBuff->next
然后在 ptrFree
上尝试 CAS 但是这个2 个操作不是原子的,可以在它之间中断。在您尝试使用 allocBuff->next
时 - allocBuff
可能已经被另一个线程分配并且 next
被覆盖到垃圾(例如无效指针)。
所以让存在 2 个线程:T#1 和 T#2
- T#1 从
ptrFree
读取 allocBuff
- T#2 从
ptrFree
读取 allocBuff
- T#2 return
allocBuff
给用户
- T#2 将
allocBuff->next
的上下文覆盖为用户数据,设为 -1。
- T#1 读取
next = allocBuff->next
并获得了一些用户数据 (-1)
- T#2 free/push
allocBuff
回到 ptrFree
- T#1 在 CAS 中正常,因为
ptrFree
再次指向 allocBuff
。 ptrFree
点
现在-1
- T#2 从
ptrFree
读取 -1
- T#2 尝试读取 -1
- 崩溃
这里甚至只有堆栈中的单个元素 (a) 和 2 个线程足以进行演示。让同样的例子:
其中头(F)和元素(a)在堆栈中。初始状态:
F -> a -> 0
- T#1 从 F
读取 a
- T#2 从 F
读取 a
- T#2 从 a
读取 0
- T#2 将 0 写入 F 并将 return a 写入用户:F -> 0
- T#2 将 -1 写入 a: a = -1;
- T#1 从 a
读取 -1
- T#2 免费 a: F -> a -> 0
- T#1 将 -1 写入 F 和 return a给用户:F -> -1
- T#2 从 F
读取 -1
- T#2 尝试读取 -1
可能还有另一场比赛
F-a-b-c
并且你想要 pop a 并将 F 分配给 b。但在你这样做之前,另一个线程首先弹出 a,所以现在
F-b-c
然后弹出 b:
F-c
和push/freea:
F-a-c.
因为 F 再次指向 a CAS 会正常,你可以创建 chain
F-b-垃圾桶
因为b现在真的在用
无论如何,您的实施远非最佳。你在这里不需要模板,TSizeOfElem
和 n
只需要在初始化过程中知道,不再需要。压测需要的线程不多,关键点延迟
void NotifyAllocated(PSINGLE_LIST_ENTRY allocBuff)
{
allocBuff->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE; // allocated = true;
WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"alocated = %p", allocBuff);
MessageBoxW(0, txt, sz, MB_ICONINFORMATION); // simulate delay !
}
void NotifyCheck(PVOID buf, PCWSTR fmt)
{
WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), fmt, buf);
MessageBoxW(0, txt, sz, MB_ICONWARNING); // simulate delay !
}
class PoolNoLock
{
PVOID _arr = 0; //array of list elements
PSINGLE_LIST_ENTRY _ptrFree = 0; //head of "singly" linked list
public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}
union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};
if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;
PSINGLE_LIST_ENTRY ptrFree = 0;
do
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);
_ptrFree = ptrFree;
}
return true;
}
return false;
}
~PoolNoLock()
{
if (PVOID buf = _arr)
{
delete[] buf;
}
}
void *Alloc()
{
PSINGLE_LIST_ENTRY allocBuff, ptrFree = _ptrFree;
for (;;)
{
allocBuff = ptrFree;
if (!allocBuff)
{
return 0;
}
NotifyCheck(allocBuff, L"try: %p");
// access allocBuff->Next !!
PSINGLE_LIST_ENTRY Next = allocBuff->Next;
NotifyCheck(Next, L"next: %p");
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Next, allocBuff);
//ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, allocBuff->Next, allocBuff);
if (ptrFree == allocBuff)
{
NotifyAllocated(allocBuff);
return allocBuff;
}
}
}
void Free(void *Address)
{
PSINGLE_LIST_ENTRY ptrFree = _ptrFree, newFree;
for ( ; ; ptrFree = newFree)
{
reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree;
newFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Address, ptrFree);
if (newFree == ptrFree)
{
return ;
}
}
}
};
ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 2;
do
{
WCHAR sz[16], txt[32];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"loop %x", n);
MessageBoxW(0, txt, sz, MB_OK);
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);
return 0;
}
void DemoTest()
{
PoolNoLock p;
if (p.Init(1, sizeof(PVOID)))
{
ULONG n = 2;
do
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}
MessageBoxW(0, 0, L"Wait", MB_OK);
}
这与您的代码相同,只是进行了优化。错误相同 - 在
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, allocBuff->Next, allocBuff);
为了测试和更好地理解 - 最好写成
PSINGLE_LIST_ENTRY Next = allocBuff->Next;
// delay !!
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, Next, allocBuff);
用于解决此问题 SLIST_HEADER
使用 - 这实际上是指向堆栈顶部的指针 + 操作计数的组合。接下来是正确的实现(如果不是直接使用 SLIST_HEADER
和 api)
class PoolNoLock
{
PVOID _arr = 0; //array of list elements
struct U
{
PSINGLE_LIST_ENTRY ptr; //head of "singly" linked list
ULONG_PTR allocCount;
void operator = (U& v)
{
ptr = v.ptr;
allocCount = v.allocCount;
}
U(U* v)
{
ptr = v->ptr->Next;
allocCount = v->allocCount + 1;
}
U(PSINGLE_LIST_ENTRY ptr, ULONG_PTR allocCount) : ptr(ptr), allocCount(allocCount)
{
}
U() : ptr(0), allocCount(0)
{
}
} u;
//++ debug
LONG _allocMiss = 0;
LONG _freeMiss = 0;
//-- debug
public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}
union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};
if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;
PSINGLE_LIST_ENTRY ptrFree = 0;
do
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);
u.ptr = ptrFree;
u.allocCount = 0;
}
return true;
}
return false;
}
~PoolNoLock()
{
if (PVOID buf = _arr)
{
delete[] buf;
}
}
void *Alloc()
{
for (;;)
{
U allocBuff = u;
if (!allocBuff.ptr)
{
return 0;
}
U Next(&allocBuff);
if (InterlockedCompareExchange128((LONG64*)&u, Next.allocCount, (LONG64)Next.ptr, (LONG64*)&allocBuff))
{
// for debug only
allocBuff.ptr->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE;
return allocBuff.ptr;
}
// for debug only
InterlockedIncrementNoFence(&_allocMiss);
}
}
void Free(void *Address)
{
for ( ; ; )
{
U ptrFree = u;
U a(reinterpret_cast<PSINGLE_LIST_ENTRY>(Address), ptrFree.allocCount);
reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree.ptr;
if (InterlockedCompareExchange128((LONG64*)&u, a.allocCount, (LONG64)a.ptr, (LONG64*)&ptrFree))
{
return ;
}
// for debug only
InterlockedIncrementNoFence(&_freeMiss);
}
}
};
ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 0x10000;
do
{
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);
return 0;
}
void DemoTest()
{
PoolNoLock p;
if (p.Init(16, sizeof(PVOID)))
{
ULONG n = 8;
do
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}
MessageBoxW(0, 0, L"Wait", MB_OK);
}
我有一个相同大小缓冲区的线程安全分配器的简单实现。 在内部,实现是一个非常简单的互锁单链表,它利用未分配缓冲区中未使用的 space 来维护单链表。
还写了一些测试,在单线程模式下测试代码 - 一切似乎都很好。 设法将问题隔离到 Free 函数,但我似乎找不到它。
我不得不提一下,我 运行 使用 Microsoft's Interlocked Singly Linked Lists 使用完全相同的代码进行了一些测试,显然它有效,但我仍然想找出我的实现有什么问题. 甚至尝试反汇编代码并应用类似的内在函数,但它没有帮助(还要注意,我不需要跟踪列表条目的数量,所以这就是为什么我不需要互锁功能来交换双寄存器-size 元素,如 InterlockedCompareExchange128 for x64)
这是分配器的代码:
#ifndef _POOLNOLOCK_HPP_
#define _POOLNOLOCK_HPP_
#include <windows.h>
template<size_t TSizeOfElem>
class PoolNoLock {
public:
PoolNoLock(size_t N) :
n(N),
arr(new ELEMENT[n])
{
for (size_t i = 0; (n - 1) > i; ++i)
{
arr[i].next = &arr[i + 1];
}
arr[n - 1].next = nullptr;
for (size_t i = 0; n > i; ++i)
{
arr[i].allocated = false;
}
}
~PoolNoLock() { delete[] arr; }
void *Alloc()
{
ELEMENT *allocBuff;
do
{
allocBuff = ptrFree;
if (!allocBuff)
{
return nullptr;
}
} while (allocBuff != InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next,
allocBuff
));
if (allocBuff->allocated)
{
__debugbreak(); //will break here
}
allocBuff->allocated = true;
return &allocBuff->buff;
}
void Free(void *Address)
{
ELEMENT *const freeBuff = reinterpret_cast<ELEMENT *>(Address);
if (!freeBuff->allocated)
{
__debugbreak();
}
freeBuff->allocated = false;
ELEMENT *cmpFree = ptrFree;
do
{
freeBuff->next = cmpFree;
ELEMENT *const xchgFree =
reinterpret_cast<ELEMENT *>(InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
freeBuff,
cmpFree
));
if (xchgFree == cmpFree)
{
break;
}
cmpFree = xchgFree;
} while (true);
}
private:
typedef struct _ELEMENT {
union {
_ELEMENT *next;
unsigned char buff[TSizeOfElem];
};
bool allocated; //debug info
}ELEMENT;
const size_t n;
ELEMENT *const arr; //array of list elements
ELEMENT *volatile ptrFree = &arr[0]; //head of "singly" linked list
};
#endif // _POOLNOLOCK_HPP_
这是我用来对class进行压力测试的代码:
- 64 是对象的最大数量,WaitForMultipleObjects 可以等待
- 需要线程中的等待来帮助实现尽可能多的线程正在访问资源的场景
- 生成的线程数正好等于分配器中的元素数,这就是 alloc-only 测试有效的原因
#include "PoolNoLock.hpp"
#include <vector>
#include <map>
#include <iostream>
static constexpr size_t N_THREAD = 64;
static constexpr size_t N_TEST_RUN = 4;
static constexpr size_t N_ALLOC_FREE = 1024;
struct ThreadParam {
PoolNoLock<sizeof(size_t)> *allocator;
const HANDLE &hStartEvent;
void *addressAlloc = nullptr;
ThreadParam(PoolNoLock<sizeof(size_t)> *Allocator, const HANDLE &StartEvent) :
allocator(Allocator),
hStartEvent(StartEvent)
{};
};
template<bool TAllocOnly>
class Test {
public:
~Test()
{
CloseHandle(hStartEvent);
}
bool RunSingle(PoolNoLock<sizeof(size_t)> *Allocator)
{
std::vector<ThreadParam> params(N_THREAD, ThreadParam(Allocator, hStartEvent));
if (TRUE != ResetEvent(hStartEvent))
{
return false;
}
for (size_t i = 0; N_THREAD != i; ++i)
{
handles[i] = CreateThread(nullptr,
0,
reinterpret_cast<PTHREAD_START_ROUTINE>(threadProc),
¶ms[i],
CREATE_SUSPENDED,
&tids[i]);
if (!handles[i])
{
return false;
}
}
for (HANDLE handle : handles)
{
if (1 != ResumeThread(handle))
{
return false;
}
}
if (TRUE != SetEvent(hStartEvent))
{
return false;
}
if ((WAIT_OBJECT_0 + N_THREAD - 1) < WaitForMultipleObjects(N_THREAD, handles, TRUE, INFINITE))
{
return false;
}
for (size_t i = 0; N_THREAD != i; ++i)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(handles[i], 0))
{
return false;
}
DWORD exitCode;
if (TRUE != GetExitCodeThread(handles[i], &exitCode))
{
return false;
}
if (0 != exitCode)
{
return false;
}
if (TRUE != CloseHandle(handles[i]))
{
return false;
}
}
if (TAllocOnly)
{
std::map<void *, DWORD> threadAllocations;
for (size_t i = 0; N_THREAD != i; ++i)
{
if (!params[i].addressAlloc)
{
return false;
}
if (threadAllocations.end() == threadAllocations.find(params[i].addressAlloc))
{
return false;
}
std::pair<std::map<void *, DWORD>::iterator, bool> res =
threadAllocations.insert(std::make_pair(params[i].addressAlloc, tids[i]));
if (!res.second)
{
return false;
}
Allocator->Free(params[i].addressAlloc);
}
if (N_THREAD != threadAllocations.size())
{
return false;
}
}
return false;
}
bool RunMultiple()
{
for (size_t i = 0; N_TEST_RUN != i; ++i)
{
PoolNoLock<sizeof(size_t)> allocator(N_THREAD);
RunSingle(&allocator);
}
return true;
}
private:
const HANDLE hStartEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
HANDLE handles[N_THREAD] = { nullptr };
DWORD tids[N_THREAD] = { 0 };
static DWORD WINAPI ThreadProcAllocOnly(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}
Param->addressAlloc = Param->allocator->Alloc();
if (!Param->addressAlloc)
{
return 3;
}
return 0;
}
static DWORD WINAPI ThreadProcAllocFree(_In_ ThreadParam *Param)
{
if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
{
return 2;
}
for (size_t i = 0; N_ALLOC_FREE != i; ++i)
{
void *ptrTest = Param->allocator->Alloc();
if (!ptrTest)
{
return 3;
}
Param->allocator->Free(ptrTest);
}
return 0;
}
const LPTHREAD_START_ROUTINE threadProc =
TAllocOnly
? reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocOnly)
: reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocFree);
};
int main()
{
Test<true> testAllocOnly0;
Test<false> TestAllocFree0;
if (!testAllocOnly0.RunMultiple()) //this test will succeed
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc-ONLY tests succeeded" << std::endl;
if (!TestAllocFree0.RunMultiple()) //this test will fail
{
std::cout << "Test failed" << std::endl;
return 1;
}
std::cout << "Alloc/free tests succeeded" << std::endl;
std::cout << "All tests succeeded" << std::endl;
return 0;
}
你在 Alloc()
程序中的错误。更符合
InterlockedCompareExchangePointer(
reinterpret_cast<void *volatile *>(&ptrFree),
allocBuff->next, // <-- !!!
allocBuff)
这里有 2 个操作:首先 cpu 从 allocBuff
指针读取 allocBuff->next
然后在 ptrFree
上尝试 CAS 但是这个2 个操作不是原子的,可以在它之间中断。在您尝试使用 allocBuff->next
时 - allocBuff
可能已经被另一个线程分配并且 next
被覆盖到垃圾(例如无效指针)。
所以让存在 2 个线程:T#1 和 T#2
- T#1 从
ptrFree
读取 - T#2 从
ptrFree
读取 - T#2 return
allocBuff
给用户 - T#2 将
allocBuff->next
的上下文覆盖为用户数据,设为 -1。 - T#1 读取
next = allocBuff->next
并获得了一些用户数据 (-1) - T#2 free/push
allocBuff
回到ptrFree
- T#1 在 CAS 中正常,因为
ptrFree
再次指向allocBuff
。ptrFree
点 现在-1 - T#2 从
ptrFree
读取 -1
- T#2 尝试读取 -1
- 崩溃
allocBuff
allocBuff
这里甚至只有堆栈中的单个元素 (a) 和 2 个线程足以进行演示。让同样的例子: 其中头(F)和元素(a)在堆栈中。初始状态: F -> a -> 0
- T#1 从 F 读取 a
- T#2 从 F 读取 a
- T#2 从 a 读取 0
- T#2 将 0 写入 F 并将 return a 写入用户:F -> 0
- T#2 将 -1 写入 a: a = -1;
- T#1 从 a 读取 -1
- T#2 免费 a: F -> a -> 0
- T#1 将 -1 写入 F 和 return a给用户:F -> -1
- T#2 从 F 读取 -1
- T#2 尝试读取 -1
可能还有另一场比赛
F-a-b-c
并且你想要 pop a 并将 F 分配给 b。但在你这样做之前,另一个线程首先弹出 a,所以现在
F-b-c
然后弹出 b:
F-c
和push/freea:
F-a-c.
因为 F 再次指向 a CAS 会正常,你可以创建 chain
F-b-垃圾桶
因为b现在真的在用
无论如何,您的实施远非最佳。你在这里不需要模板,TSizeOfElem
和 n
只需要在初始化过程中知道,不再需要。压测需要的线程不多,关键点延迟
void NotifyAllocated(PSINGLE_LIST_ENTRY allocBuff)
{
allocBuff->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE; // allocated = true;
WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"alocated = %p", allocBuff);
MessageBoxW(0, txt, sz, MB_ICONINFORMATION); // simulate delay !
}
void NotifyCheck(PVOID buf, PCWSTR fmt)
{
WCHAR sz[16], txt[64];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), fmt, buf);
MessageBoxW(0, txt, sz, MB_ICONWARNING); // simulate delay !
}
class PoolNoLock
{
PVOID _arr = 0; //array of list elements
PSINGLE_LIST_ENTRY _ptrFree = 0; //head of "singly" linked list
public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}
union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};
if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;
PSINGLE_LIST_ENTRY ptrFree = 0;
do
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);
_ptrFree = ptrFree;
}
return true;
}
return false;
}
~PoolNoLock()
{
if (PVOID buf = _arr)
{
delete[] buf;
}
}
void *Alloc()
{
PSINGLE_LIST_ENTRY allocBuff, ptrFree = _ptrFree;
for (;;)
{
allocBuff = ptrFree;
if (!allocBuff)
{
return 0;
}
NotifyCheck(allocBuff, L"try: %p");
// access allocBuff->Next !!
PSINGLE_LIST_ENTRY Next = allocBuff->Next;
NotifyCheck(Next, L"next: %p");
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Next, allocBuff);
//ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, allocBuff->Next, allocBuff);
if (ptrFree == allocBuff)
{
NotifyAllocated(allocBuff);
return allocBuff;
}
}
}
void Free(void *Address)
{
PSINGLE_LIST_ENTRY ptrFree = _ptrFree, newFree;
for ( ; ; ptrFree = newFree)
{
reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree;
newFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Address, ptrFree);
if (newFree == ptrFree)
{
return ;
}
}
}
};
ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 2;
do
{
WCHAR sz[16], txt[32];
swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
swprintf_s(txt, _countof(txt), L"loop %x", n);
MessageBoxW(0, txt, sz, MB_OK);
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);
return 0;
}
void DemoTest()
{
PoolNoLock p;
if (p.Init(1, sizeof(PVOID)))
{
ULONG n = 2;
do
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}
MessageBoxW(0, 0, L"Wait", MB_OK);
}
这与您的代码相同,只是进行了优化。错误相同 - 在
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, allocBuff->Next, allocBuff);
为了测试和更好地理解 - 最好写成
PSINGLE_LIST_ENTRY Next = allocBuff->Next;
// delay !!
ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
(void**)&_ptrFree, Next, allocBuff);
用于解决此问题 SLIST_HEADER
使用 - 这实际上是指向堆栈顶部的指针 + 操作计数的组合。接下来是正确的实现(如果不是直接使用 SLIST_HEADER
和 api)
class PoolNoLock
{
PVOID _arr = 0; //array of list elements
struct U
{
PSINGLE_LIST_ENTRY ptr; //head of "singly" linked list
ULONG_PTR allocCount;
void operator = (U& v)
{
ptr = v.ptr;
allocCount = v.allocCount;
}
U(U* v)
{
ptr = v->ptr->Next;
allocCount = v->allocCount + 1;
}
U(PSINGLE_LIST_ENTRY ptr, ULONG_PTR allocCount) : ptr(ptr), allocCount(allocCount)
{
}
U() : ptr(0), allocCount(0)
{
}
} u;
//++ debug
LONG _allocMiss = 0;
LONG _freeMiss = 0;
//-- debug
public:
bool Init(size_t N, size_t SizeOfElem)
{
if (N)
{
if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
{
SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
}
union {
PUCHAR buf;
PSINGLE_LIST_ENTRY Next;
};
if (buf = new UCHAR[N * SizeOfElem])
{
_arr = buf;
PSINGLE_LIST_ENTRY ptrFree = 0;
do
{
Next->Next = ptrFree;
ptrFree = Next;
} while (buf += SizeOfElem, --N);
u.ptr = ptrFree;
u.allocCount = 0;
}
return true;
}
return false;
}
~PoolNoLock()
{
if (PVOID buf = _arr)
{
delete[] buf;
}
}
void *Alloc()
{
for (;;)
{
U allocBuff = u;
if (!allocBuff.ptr)
{
return 0;
}
U Next(&allocBuff);
if (InterlockedCompareExchange128((LONG64*)&u, Next.allocCount, (LONG64)Next.ptr, (LONG64*)&allocBuff))
{
// for debug only
allocBuff.ptr->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE;
return allocBuff.ptr;
}
// for debug only
InterlockedIncrementNoFence(&_allocMiss);
}
}
void Free(void *Address)
{
for ( ; ; )
{
U ptrFree = u;
U a(reinterpret_cast<PSINGLE_LIST_ENTRY>(Address), ptrFree.allocCount);
reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree.ptr;
if (InterlockedCompareExchange128((LONG64*)&u, a.allocCount, (LONG64)a.ptr, (LONG64*)&ptrFree))
{
return ;
}
// for debug only
InterlockedIncrementNoFence(&_freeMiss);
}
}
};
ULONG ThreadTest(PoolNoLock* p)
{
ULONG n = 0x10000;
do
{
if (void* buf = p->Alloc())
{
p->Free(buf);
}
} while (--n);
return 0;
}
void DemoTest()
{
PoolNoLock p;
if (p.Init(16, sizeof(PVOID)))
{
ULONG n = 8;
do
{
CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
} while (--n);
}
MessageBoxW(0, 0, L"Wait", MB_OK);
}