线程安全的无锁内存池:自由函数在多线程中的行为不正确

Thread-safe lockless memory pool: free function does not behave correctly in multi-thread

我有一个相同大小缓冲区的线程安全分配器的简单实现。 在内部,实现是一个非常简单的互锁单链表,它利用未分配缓冲区中未使用的 space 来维护单链表。

还写了一些测试,在单线程模式下测试代码 - 一切似乎都很好。 设法将问题隔离到 Free 函数,但我似乎找不到它。

我不得不提一下,我 运行 使用 Microsoft's Interlocked Singly Linked Lists 使用完全相同的代码进行了一些测试,显然它有效,但我仍然想找出我的实现有什么问题. 甚至尝试反汇编代码并应用类似的内在函数,但它没有帮助(还要注意,我不需要跟踪列表条目的数量,所以这就是为什么我不需要互锁功能来交换双寄存器-size 元素,如 InterlockedCompareExchange128 for x64)

这是分配器的代码:

#ifndef _POOLNOLOCK_HPP_
#define _POOLNOLOCK_HPP_

#include <windows.h>

template<size_t TSizeOfElem>
class PoolNoLock {
public:
    PoolNoLock(size_t N) :
        n(N),
        arr(new ELEMENT[n])
    {
        for (size_t i = 0; (n - 1) > i; ++i)
        {
            arr[i].next = &arr[i + 1];
        }
        arr[n - 1].next = nullptr;

        for (size_t i = 0; n > i; ++i)
        {
            arr[i].allocated = false;
        }
    }

    ~PoolNoLock() { delete[] arr; }

    void *Alloc()
    {
        ELEMENT *allocBuff;

        do
        {
            allocBuff = ptrFree;
            if (!allocBuff)
            {
                return nullptr;
            }
        } while (allocBuff != InterlockedCompareExchangePointer(
            reinterpret_cast<void *volatile *>(&ptrFree),
            allocBuff->next,
            allocBuff
        ));

        if (allocBuff->allocated)
        {
            __debugbreak(); //will break here
        }

        allocBuff->allocated = true;

        return &allocBuff->buff;
    }

    void Free(void *Address)
    {
        ELEMENT *const freeBuff = reinterpret_cast<ELEMENT *>(Address);

        if (!freeBuff->allocated)
        {
            __debugbreak();
        }

        freeBuff->allocated = false;

        ELEMENT *cmpFree = ptrFree;
        do
        {
            freeBuff->next = cmpFree;

            ELEMENT *const xchgFree =
                reinterpret_cast<ELEMENT *>(InterlockedCompareExchangePointer(
                    reinterpret_cast<void *volatile *>(&ptrFree),
                    freeBuff,
                    cmpFree
                ));

            if (xchgFree == cmpFree)
            {
                break;
            }

            cmpFree = xchgFree;
        } while (true);
    }

private:
    typedef struct _ELEMENT {
        union {
            _ELEMENT *next;
            unsigned char buff[TSizeOfElem];
        };
        bool allocated; //debug info
    }ELEMENT;

    const size_t n;
    ELEMENT *const arr; //array of list elements

    ELEMENT *volatile ptrFree = &arr[0]; //head of "singly" linked list
};

#endif // _POOLNOLOCK_HPP_

这是我用来对class进行压力测试的代码:

  1. 64 是对象的最大数量,WaitForMultipleObjects 可以等待
  2. 需要线程中的等待来帮助实现尽可能多的线程正在访问资源的场景
  3. 生成的线程数正好等于分配器中的元素数,这就是 alloc-only 测试有效的原因
#include "PoolNoLock.hpp"
#include <vector>
#include <map>
#include <iostream>

static constexpr size_t N_THREAD = 64;
static constexpr size_t N_TEST_RUN = 4;
static constexpr size_t N_ALLOC_FREE = 1024;

struct ThreadParam {
    PoolNoLock<sizeof(size_t)> *allocator;
    const HANDLE &hStartEvent;
    void *addressAlloc = nullptr;

    ThreadParam(PoolNoLock<sizeof(size_t)> *Allocator, const HANDLE &StartEvent) :
        allocator(Allocator),
        hStartEvent(StartEvent)
    {};
};

template<bool TAllocOnly>
class Test {
public:
    ~Test()
    {
        CloseHandle(hStartEvent);
    }

    bool RunSingle(PoolNoLock<sizeof(size_t)> *Allocator)
    {
        std::vector<ThreadParam> params(N_THREAD, ThreadParam(Allocator, hStartEvent));

        if (TRUE != ResetEvent(hStartEvent))
        {
            return false;
        }

        for (size_t i = 0; N_THREAD != i; ++i)
        {
            handles[i] = CreateThread(nullptr,
                0,
                reinterpret_cast<PTHREAD_START_ROUTINE>(threadProc),
                &params[i],
                CREATE_SUSPENDED,
                &tids[i]);
            if (!handles[i])
            {
                return false;
            }
        }

        for (HANDLE handle : handles)
        {
            if (1 != ResumeThread(handle))
            {
                return false;
            }
        }

        if (TRUE != SetEvent(hStartEvent))
        {
            return false;
        }

        if ((WAIT_OBJECT_0 + N_THREAD - 1) < WaitForMultipleObjects(N_THREAD, handles, TRUE, INFINITE))
        {
            return false;
        }

        for (size_t i = 0; N_THREAD != i; ++i)
        {
            if (WAIT_OBJECT_0 != WaitForSingleObject(handles[i], 0))
            {
                return false;
            }

            DWORD exitCode;
            if (TRUE != GetExitCodeThread(handles[i], &exitCode))
            {
                return false;
            }

            if (0 != exitCode)
            {
                return false;
            }

            if (TRUE != CloseHandle(handles[i]))
            {
                return false;
            }
        }

        if (TAllocOnly)
        {
            std::map<void *, DWORD> threadAllocations;

            for (size_t i = 0; N_THREAD != i; ++i)
            {
                if (!params[i].addressAlloc)
                {
                    return false;
                }

                if (threadAllocations.end() == threadAllocations.find(params[i].addressAlloc))
                {
                    return false;
                }

                std::pair<std::map<void *, DWORD>::iterator, bool> res =
                    threadAllocations.insert(std::make_pair(params[i].addressAlloc, tids[i]));

                if (!res.second)
                {
                    return false;
                }

                Allocator->Free(params[i].addressAlloc);
            }

            if (N_THREAD != threadAllocations.size())
            {
                return false;
            }
        }

        return false;
    }

    bool RunMultiple()
    {
        for (size_t i = 0; N_TEST_RUN != i; ++i)
        {
            PoolNoLock<sizeof(size_t)> allocator(N_THREAD);
            RunSingle(&allocator);
        }

        return true;
    }

private:
    const HANDLE hStartEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);

    HANDLE handles[N_THREAD] = { nullptr };
    DWORD tids[N_THREAD] = { 0 };

    static DWORD WINAPI ThreadProcAllocOnly(_In_ ThreadParam *Param)
    {
        if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
        {
            return 2;
        }

        Param->addressAlloc = Param->allocator->Alloc();
        if (!Param->addressAlloc)
        {
            return 3;
        }

        return 0;
    }

    static DWORD WINAPI ThreadProcAllocFree(_In_ ThreadParam *Param)
    {
        if (WAIT_OBJECT_0 != WaitForSingleObject(Param->hStartEvent, INFINITE))
        {
            return 2;
        }

        for (size_t i = 0; N_ALLOC_FREE != i; ++i)
        {
            void *ptrTest = Param->allocator->Alloc();
            if (!ptrTest)
            {
                return 3;
            }

            Param->allocator->Free(ptrTest);
        }

        return 0;
    }

    const LPTHREAD_START_ROUTINE threadProc =
        TAllocOnly
        ? reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocOnly)
        : reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadProcAllocFree);
};

int main()
{
    Test<true> testAllocOnly0;
    Test<false> TestAllocFree0;

    if (!testAllocOnly0.RunMultiple()) //this test will succeed
    {
        std::cout << "Test failed" << std::endl;
        return 1;
    }
    std::cout << "Alloc-ONLY tests succeeded" << std::endl;

    if (!TestAllocFree0.RunMultiple()) //this test will fail
    {
        std::cout << "Test failed" << std::endl;
        return 1;
    }
    std::cout << "Alloc/free tests succeeded" << std::endl;

    std::cout << "All tests succeeded" << std::endl;

    return 0;
}

你在 Alloc() 程序中的错误。更符合

InterlockedCompareExchangePointer(
            reinterpret_cast<void *volatile *>(&ptrFree),
            allocBuff->next, // <-- !!!
            allocBuff)

这里有 2 个操作:首先 cpu 从 allocBuff 指针读取 allocBuff->next 然后在 ptrFree 上尝试 CAS 但是这个2 个操作不是原子的,可以在它之间中断。在您尝试使用 allocBuff->next 时 - allocBuff 可能已经被另一个线程分配并且 next 被覆盖到垃圾(例如无效指针)。

所以让存在 2 个线程:T#1 和 T#2

  • T#1 从 ptrFree
  • 读取 allocBuff
  • T#2 从 ptrFree
  • 读取 allocBuff
  • T#2 return allocBuff 给用户
  • T#2 将 allocBuff->next 的上下文覆盖为用户数据,设为 -1。
  • T#1 读取 next = allocBuff->next 并获得了一些用户数据 (-1)
  • T#2 free/push allocBuff 回到 ptrFree
  • T#1 在 CAS 中正常,因为 ptrFree 再次指向 allocBuffptrFree 点 现在-1
  • T#2 从 ptrFree
  • 读取 -1
  • T#2 尝试读取 -1
  • 崩溃

这里甚至只有堆栈中的单个元素 (a) 和 2 个线程足以进行演示。让同样的例子: 其中头(F)和元素(a)在堆栈中。初始状态: F -> a -> 0

  • T#1 从 F
  • 读取 a
  • T#2 从 F
  • 读取 a
  • T#2 从 a
  • 读取 0
  • T#2 将 0 写入 F 并将 return a 写入用户:F -> 0
  • T#2 将 -1 写入 a: a = -1;
  • T#1 从 a
  • 读取 -1
  • T#2 免费 a: F -> a -> 0
  • T#1 将 -1 写入 F 和 return a给用户:F -> -1
  • T#2 从 F
  • 读取 -1
  • T#2 尝试读取 -1

可能还有另一场比赛

F-a-b-c

并且你想要 pop a 并将 F 分配给 b。但在你这样做之前,另一个线程首先弹出 a,所以现在

F-b-c

然后弹出 b:

F-c

和push/freea:

F-a-c.

因为 F 再次指向 a CAS 会正常,你可以创建 chain

F-b-垃圾桶

因为b现在真的在用


无论如何,您的实施远非最佳。你在这里不需要模板,TSizeOfElemn 只需要在初始化过程中知道,不再需要。压测需要的线程不多,关键点延迟

void NotifyAllocated(PSINGLE_LIST_ENTRY allocBuff)
{
    allocBuff->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE; // allocated = true;
    
    WCHAR sz[16], txt[64];
    swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
    swprintf_s(txt, _countof(txt), L"alocated = %p", allocBuff);
    
    MessageBoxW(0, txt, sz, MB_ICONINFORMATION); // simulate delay !
}

void NotifyCheck(PVOID buf, PCWSTR fmt)
{
    WCHAR sz[16], txt[64];
    swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
    swprintf_s(txt, _countof(txt), fmt, buf);
    MessageBoxW(0, txt, sz, MB_ICONWARNING); // simulate delay !
}

class PoolNoLock 
{
    PVOID _arr = 0;                     //array of list elements
    PSINGLE_LIST_ENTRY _ptrFree = 0;    //head of "singly" linked list

public:
    bool Init(size_t N, size_t SizeOfElem)
    {
        if (N)
        {
            if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
            {
                SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
            }

            union {
                PUCHAR buf;
                PSINGLE_LIST_ENTRY Next;
            };

            if (buf = new UCHAR[N * SizeOfElem])
            {
                _arr = buf;

                PSINGLE_LIST_ENTRY ptrFree = 0;

                do 
                {
                    Next->Next = ptrFree;
                    ptrFree = Next;
                } while (buf += SizeOfElem, --N);

                _ptrFree = ptrFree;
            }

            return true;
        }

        return false;
    }

    ~PoolNoLock() 
    { 
        if (PVOID buf = _arr)
        {
            delete[] buf; 
        }
    }

    void *Alloc()
    {
        PSINGLE_LIST_ENTRY allocBuff, ptrFree = _ptrFree;

        for (;;)
        {
            allocBuff = ptrFree;

            if (!allocBuff)
            {
                return 0;
            }

            NotifyCheck(allocBuff, L"try: %p");

            // access allocBuff->Next !!
            PSINGLE_LIST_ENTRY Next = allocBuff->Next;

            NotifyCheck(Next, L"next: %p");

            ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Next, allocBuff);
            //ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, allocBuff->Next, allocBuff);

            if (ptrFree == allocBuff)
            {
                NotifyAllocated(allocBuff);
                return allocBuff;
            }
        }
    }

    void Free(void *Address)
    {
        PSINGLE_LIST_ENTRY ptrFree = _ptrFree, newFree;

        for ( ; ; ptrFree = newFree)
        {
            reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree;

            newFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer((void**)&_ptrFree, Address, ptrFree);

            if (newFree == ptrFree)
            {
                return ;
            }
        }
    }
};

ULONG ThreadTest(PoolNoLock* p)
{
    ULONG n = 2;
    do 
    {
        WCHAR sz[16], txt[32];
        swprintf_s(sz, _countof(sz), L"%x", GetCurrentThreadId());
        swprintf_s(txt, _countof(txt), L"loop %x", n);
        MessageBoxW(0, txt, sz, MB_OK);
        if (void* buf = p->Alloc())
        {
            p->Free(buf);
        }
    } while (--n);

    return 0;
}

void DemoTest()
{
    PoolNoLock p;
    if (p.Init(1, sizeof(PVOID)))
    {
        ULONG n = 2;
        do 
        {
            CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
        } while (--n);
    }

    MessageBoxW(0, 0, L"Wait", MB_OK);
}

这与您的代码相同,只是进行了优化。错误相同 - 在

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
        (void**)&_ptrFree, allocBuff->Next, allocBuff);

为了测试和更好地理解 - 最好写成

PSINGLE_LIST_ENTRY Next = allocBuff->Next;

// delay !!

ptrFree = (PSINGLE_LIST_ENTRY)InterlockedCompareExchangePointer(
    (void**)&_ptrFree, Next, allocBuff);

用于解决此问题 SLIST_HEADER 使用 - 这实际上是指向堆栈顶部的指针 + 操作计数的组合。接下来是正确的实现(如果不是直接使用 SLIST_HEADER 和 api)

class PoolNoLock 
{
    PVOID _arr = 0;                     //array of list elements

    struct U  
    {
        PSINGLE_LIST_ENTRY ptr; //head of "singly" linked list
        ULONG_PTR allocCount;

        void operator = (U& v)
        {
            ptr = v.ptr;
            allocCount = v.allocCount;
        }

        U(U* v)
        {
            ptr = v->ptr->Next;
            allocCount = v->allocCount + 1;
        }

        U(PSINGLE_LIST_ENTRY ptr, ULONG_PTR allocCount) : ptr(ptr), allocCount(allocCount)
        {
        }

        U() : ptr(0), allocCount(0)
        {
        }

    } u;

    //++ debug
    LONG _allocMiss = 0;
    LONG _freeMiss = 0;
    //-- debug

public:
    bool Init(size_t N, size_t SizeOfElem)
    {
        if (N)
        {
            if (SizeOfElem < sizeof(SINGLE_LIST_ENTRY))
            {
                SizeOfElem = sizeof(SINGLE_LIST_ENTRY);
            }

            union {
                PUCHAR buf;
                PSINGLE_LIST_ENTRY Next;
            };

            if (buf = new UCHAR[N * SizeOfElem])
            {
                _arr = buf;

                PSINGLE_LIST_ENTRY ptrFree = 0;

                do 
                {
                    Next->Next = ptrFree;
                    ptrFree = Next;
                } while (buf += SizeOfElem, --N);

                u.ptr = ptrFree;
                u.allocCount = 0;
            }

            return true;
        }

        return false;
    }

    ~PoolNoLock() 
    { 
        if (PVOID buf = _arr)
        {
            delete[] buf; 
        }
    }

    void *Alloc()
    {
        for (;;)
        {
            U allocBuff = u;

            if (!allocBuff.ptr)
            {
                return 0;
            }

            U Next(&allocBuff);

            if (InterlockedCompareExchange128((LONG64*)&u, Next.allocCount, (LONG64)Next.ptr, (LONG64*)&allocBuff))
            {
                // for debug only
                allocBuff.ptr->Next = (PSINGLE_LIST_ENTRY)INVALID_HANDLE_VALUE;

                return allocBuff.ptr;
            } 

            // for debug only
            InterlockedIncrementNoFence(&_allocMiss);
        }
    }

    void Free(void *Address)
    {
        for ( ; ; )
        {
            U ptrFree = u;
            U a(reinterpret_cast<PSINGLE_LIST_ENTRY>(Address), ptrFree.allocCount);
            
            reinterpret_cast<PSINGLE_LIST_ENTRY>(Address)->Next = ptrFree.ptr;

            if (InterlockedCompareExchange128((LONG64*)&u, a.allocCount, (LONG64)a.ptr, (LONG64*)&ptrFree))
            {
                return ;
            }

            // for debug only
            InterlockedIncrementNoFence(&_freeMiss);
        }
    }
};

ULONG ThreadTest(PoolNoLock* p)
{
    ULONG n = 0x10000;
    do 
    {
        if (void* buf = p->Alloc())
        {
            p->Free(buf);
        }
    } while (--n);

    return 0;
}

void DemoTest()
{
    PoolNoLock p;
    if (p.Init(16, sizeof(PVOID)))
    {
        ULONG n = 8;
        do 
        {
            CloseHandle(CreateThread(0, 0, (PTHREAD_START_ROUTINE)ThreadTest, &p, 0, 0));
        } while (--n);
    }

    MessageBoxW(0, 0, L"Wait", MB_OK);
}