以简单的方式使用 windows fiber 但出现无法解释的错误

Using windows fiber in a simple way but unexplainable bugs occur

当发生一些奇怪的崩溃和未定义的行为时,我尝试 windows fibers 实现我自己的任务调度程序。 为了简单起见,我开始了一个新项目并编写了一个执行以下操作的简单程序:

  1. 主线程创建一堆纤程,然后启动两个线程
  2. 主线程一直等到你杀死程序
  3. 每个工作线程将自己转换为纤程
  4. 每个工作线程都试图找到一个空闲纤程,然后切换到这个新的空闲纤程
  5. 一旦线程切换到新纤程,它会将其先前的纤程推入空闲纤程容器
  6. 每个工作线程转到步骤 4

如果您不熟悉纤维概念this talk is a good start

数据

每个线程都有自己的 ThreadData 数据结构来存储其先前、当前的纤程实例及其线程索引。 我尝试了几种方法来在执行期间检索 ThreadData 数据结构:

问题

第一次进入一个fiber时(看FiberFunc函数),使用这个fiber的线程必须push它的上一个fiber 放入自由纤维容器中。 但是碰巧有时候前面的fiber是null,这是不可能的。 这是不可能的,因为在切换到新光纤之前,线程将其 previous fiber 值设置为其 current fiber 值(并将其设置为 current fiber value with the new fiber value).

因此,如果一个线程进入一个全新的纤程,其 先前的纤程 设置为 null,这将意味着它来自无处(这没有任何意义)。

ThreadData 在进入全新纤程时将其 previous fiber 值设置为 null 的唯一原因是另一个线程将其设置为 null 或编译器在后台重新排序指令。

我检查了程序集,似乎是编译器不负责。

有几个错误我无法解释:

  1. 如果我使用第一个 GetThreadData() 函数来检索 ThreadData 结构,我可以检索一个索引不同于线程局部索引的实例 (这些索引已在线程启动时设置)。这将使程序断言 (assert(threadData->index == localThreadIndex)).

  2. 如果我使用任何其他函数来检索 ThreadData 结构,我将在 FiberFunc 函数中断言,因为 previous fiber 值为 null (assert(threadData->previousFiber)).

您知道为什么这段代码不起作用吗?我花了无数个小时试图找出问题所在,但我没有看到我的错误。

规格

OS: Windows 10

IDE:Visual Studio 2015 和 Visual Studio 2017

编译器:VC++

配置:发布

注意Debug配置没有bug

代码

您可以在断言触发之前尝试 运行 几次。

#include "Windows.h"
#include <vector>
#include <thread>
#include <mutex>
#include <cassert>
#include <iostream>
#include <atomic>

struct Fiber
{
    void* handle;
};

struct ThreadData
{
    Fiber*  previousFiber{ nullptr };
    Fiber*  currentFiber{ nullptr };
    Fiber   fiber{ };
    unsigned int index{};
};

//Threads
std::vector<std::pair<std::thread::id, unsigned int>> threadsinfo{};

//threads data container
ThreadData  threadsData[8];

//Fibers
std::mutex  fibersLock{};
std::vector<Fiber> fibers{};
std::vector<Fiber*> freeFibers{};

thread_local unsigned int localThreadIndex{};
thread_local Fiber* debug_localTheadLastFiber{};
thread_local ThreadData* localThreadData{};

using WindowsThread = HANDLE;
std::vector<WindowsThread> threads{};

//This is the first way to retrieve the current thread's ThreadData structure using thread_id
//ThreadData* GetThreadData()
//{
//  std::thread::id threadId( std::this_thread::get_id());
//  for (auto const& pair : threadsinfo)
//  {
//      if (pair.first == threadId)
//      {
//          return &threadsData[pair.second];
//      }
//  }
//
//  //It is not possible to assert
//  assert(false);
//  return nullptr;
//}

//This is the second way to retrieve the current thread's ThreadData structure using thread local storage
//ThreadData* GetThreadData()
//{
//  return &threadsData[localThreadIndex];
//}


//This is the third way to retrieve the current thread's ThreadData structure using thread local storage
ThreadData* GetThreadData()
{
    return localThreadData;
}


//Try to pop a free fiber from the container, thread safe due to mutex usage
bool  TryPopFreeFiber(Fiber*& fiber)
{
    std::lock_guard<std::mutex> guard(fibersLock);
    if (freeFibers.empty()) { return false; }
    fiber = freeFibers.back();
    assert(fiber);
    assert(fiber->handle);
    freeFibers.pop_back();
    return true;
}


//Try to push a free fiber to the container, thread safe due to mutex usage
bool PushFreeFiber(Fiber* fiber)
{
    std::lock_guard<std::mutex> guard(fibersLock);
    freeFibers.push_back(fiber);
    return true;
}


//the __declspec(noinline) is used to inspect code in release mode, comment it if you want
__declspec(noinline) void  _SwitchToFiber(Fiber* newFiber)
{
    //You want to switch to another fiber
    //You first have to save your current fiber instance to release it once you will be in the new fiber
    {
        ThreadData* threadData{ GetThreadData() };
        assert(threadData->index == localThreadIndex);
        assert(threadData->currentFiber);
        threadData->previousFiber = threadData->currentFiber;
        threadData->currentFiber = newFiber;
        debug_localTheadLastFiber = threadData->previousFiber;
        assert(threadData->previousFiber);
        assert(newFiber);
        assert(newFiber->handle);
    }

    //You switch to the new fiber
    //this call will either make you enter in the FiberFunc function if the fiber has never been used
    //Or you will continue to execute this function if the new fiber has been already used (not that you will have a different stack so you can't use the old threadData value)
    ::SwitchToFiber(newFiber->handle);

    {
        //You must get the current ThreadData* again, because you come from another fiber (the previous statement is a switch), this fiber could have been used by any other thread
        ThreadData* threadData{ GetThreadData() };

        //THIS ASSERT WILL FIRES IF YOU USE THE FIRST GetThreadData METHOD, WHICH IS IMPOSSIBLE....
        assert(threadData->index == localThreadIndex);

        assert(threadData);
        assert(threadData->previousFiber);

        //We release the previous fiber
        PushFreeFiber(threadData->previousFiber);
        debug_localTheadLastFiber = nullptr;
        threadData->previousFiber = nullptr;
    }

}


void ExecuteThreadBody()
{
    Fiber*  newFiber{};

    if (TryPopFreeFiber(newFiber))
    {
        _SwitchToFiber(newFiber);
    }
}


DWORD __stdcall ThreadFunc(void* data)
{
    int const index{ *static_cast<int*>(data)};

    threadsinfo[index] = std::make_pair(std::this_thread::get_id(), index);

    //setting up the current thread data
    ThreadData* threadData{ &threadsData[index] };
    threadData->index = index;

    void*   threadAsFiber{ ConvertThreadToFiber(nullptr) };
    assert(threadAsFiber);

    threadData->fiber = Fiber{ threadAsFiber };
    threadData->currentFiber = &threadData->fiber;

    localThreadData = threadData;
    localThreadIndex = index;

    while (true)
    {
        ExecuteThreadBody();
    }

    return DWORD{};
}


//The entry point of all fibers
void __stdcall FiberFunc(void* data)
{
    //You enter to the fiber for the first time

    ThreadData* threadData{ GetThreadData() };

    //Making sure that the thread data structure is the good one
    assert(threadData->index == localThreadIndex);

    //Here you will assert
    assert(threadData->previousFiber);

    PushFreeFiber(threadData->previousFiber);
    threadData->previousFiber = nullptr;

    while (true)
    {
        ExecuteThreadBody();
    }
}


__declspec(noinline) void main()
{
    constexpr unsigned int threadCount{ 2 };
    constexpr unsigned int fiberCount{ 20 };

    threadsinfo.resize(threadCount);

    fibers.resize(fiberCount);
    for (auto index = 0; index < fiberCount; ++index)
    {
        fibers[index] = { CreateFiber(0, FiberFunc, nullptr) };
    }

    freeFibers.resize(fiberCount);
    for (auto index = 0; index < fiberCount; ++index)
    {
        freeFibers[index] = std::addressof(fibers[index]);
    }

    threads.resize(threadCount);

    std::vector<int>    threadParamss(threadCount);



    for (auto index = 0; index < threadCount; ++index)
    {
        //threads[index] = new std::thread{ ThreadFunc, index };
        threadParamss[index] = index;
        threads[index] = CreateThread(NULL, 0, &ThreadFunc, &threadParamss[index], 0, NULL);
        assert(threads[index]);
    }

    while (true);

    //I know, it is not clean, it will leak
}

嗯,几个月后。我发现声明为 thread_local 的变量是罪魁祸首。如果您使用光纤,请忘记 thread_local 变量并使用您在创建它们时分配的每个光纤内存。 我现在将我当前的线程索引存储在 per-fiber 结构实例中。