以简单的方式使用Windows Fiber,但会出现无法解释的错误

7uzetpgm  于 2022-11-18  发布在  Windows
关注(0)|答案(2)|浏览(142)

我在windows fibers上实现了我自己的任务调度器,但出现了一些奇怪的崩溃和未定义的行为。为了简单起见,我启动了一个新项目,并编写了一个简单的程序,执行以下操作:
1.主线程创建一束纤维,然后启动两个线程
1.主线程会一直等待,直到您终止程序
1.每个工作线程将自己转换为一个纤程
1.每个工作线程尝试找到一个空闲纤程,然后切换到这个新的空闲纤程
1.一旦线程切换到新纤程,它就会将以前的纤程推入空闲纤程容器
1.每个工作线程都会转到步骤4
如果您不熟悉光纤概念this talk is a good start
"数据“
每个线程都有自己的 ThreadData 数据结构来存储它以前的、当前的纤程示例,以及它的线程索引。我尝试了几种方法来在执行期间检索 ThreadData 数据结构:

  • 我使用线程本地存储来存储ThreadData指针
  • 我使用了一个将thread_id与ThreadData结构相关联的容器

"问题"
首次进入光纤时(请看 FiberFunc 函数),使用此纤程的线程必须将其 previous fiber 推入空闲纤程容器。但有时 previous fiber 为空,这是不可能的。这是不可能的,因为在切换到新纤程之前,线程将其 previous fiber 值设置为 current fiber 值(并且它将其 * 当前光纤 * 值设置为新的光纤值)。
因此,如果一个线程进入一个全新的纤程,而它的 previous fiber 设置为null,这意味着它来自任何地方(这没有任何意义)。

  • ThreadData* 在进入全新纤程时将其 previous fiber 值设置为null的唯一原因是另一个线程将其设置为null,或者编译器在幕后重新排序了指令。

我检查了程序集,似乎编译器不负责。
有几个bug我无法解释:
1.如果我使用第一个 GetThreadData() 函数来检索ThreadData结构,我可以检索到一个示例,它的索引与线程本地索引不同(这些索引在线程启动时已经设置),这将使程序Assert(assert(threadData-〉index == localThreadIndex))。
1.如果我使用任何其他函数来检索ThreadData结构,我将在 FiberFunc 函数中Assert,因为 previous fiber 值为空(assert(threadData-〉previousFiber))。
你知道为什么这个代码不工作吗?我花了无数个小时试图找出错误,但我没有看到我的错误。

规格

操作系统:Windows 10
集成开发环境:Visual Studio 2015和Visual Studio 2017
编译器:VC++
配置方式:放行

请注意,调试配置中没有错误。

"法典"

在引发Assert之前,您可以尝试运行它几次。

#include "Windows.h"
#include <vector>
#include <thread>
#include <mutex>
#include <cassert>
#include <iostream>
#include <atomic>

struct Fiber
{
    void* handle;
};

struct ThreadData
{
    Fiber*  previousFiber{ nullptr };
    Fiber*  currentFiber{ nullptr };
    Fiber   fiber{ };
    unsigned int index{};
};

//Threads
std::vector<std::pair<std::thread::id, unsigned int>> threadsinfo{};

//threads data container
ThreadData  threadsData[8];

//Fibers
std::mutex  fibersLock{};
std::vector<Fiber> fibers{};
std::vector<Fiber*> freeFibers{};

thread_local unsigned int localThreadIndex{};
thread_local Fiber* debug_localTheadLastFiber{};
thread_local ThreadData* localThreadData{};

using WindowsThread = HANDLE;
std::vector<WindowsThread> threads{};

//This is the first way to retrieve the current thread's ThreadData structure using thread_id
//ThreadData* GetThreadData()
//{
//  std::thread::id threadId( std::this_thread::get_id());
//  for (auto const& pair : threadsinfo)
//  {
//      if (pair.first == threadId)
//      {
//          return &threadsData[pair.second];
//      }
//  }
//
//  //It is not possible to assert
//  assert(false);
//  return nullptr;
//}

//This is the second way to retrieve the current thread's ThreadData structure using thread local storage
//ThreadData* GetThreadData()
//{
//  return &threadsData[localThreadIndex];
//}

//This is the third way to retrieve the current thread's ThreadData structure using thread local storage
ThreadData* GetThreadData()
{
    return localThreadData;
}

//Try to pop a free fiber from the container, thread safe due to mutex usage
bool  TryPopFreeFiber(Fiber*& fiber)
{
    std::lock_guard<std::mutex> guard(fibersLock);
    if (freeFibers.empty()) { return false; }
    fiber = freeFibers.back();
    assert(fiber);
    assert(fiber->handle);
    freeFibers.pop_back();
    return true;
}

//Try to push a free fiber to the container, thread safe due to mutex usage
bool PushFreeFiber(Fiber* fiber)
{
    std::lock_guard<std::mutex> guard(fibersLock);
    freeFibers.push_back(fiber);
    return true;
}

//the __declspec(noinline) is used to inspect code in release mode, comment it if you want
__declspec(noinline) void  _SwitchToFiber(Fiber* newFiber)
{
    //You want to switch to another fiber
    //You first have to save your current fiber instance to release it once you will be in the new fiber
    {
        ThreadData* threadData{ GetThreadData() };
        assert(threadData->index == localThreadIndex);
        assert(threadData->currentFiber);
        threadData->previousFiber = threadData->currentFiber;
        threadData->currentFiber = newFiber;
        debug_localTheadLastFiber = threadData->previousFiber;
        assert(threadData->previousFiber);
        assert(newFiber);
        assert(newFiber->handle);
    }

    //You switch to the new fiber
    //this call will either make you enter in the FiberFunc function if the fiber has never been used
    //Or you will continue to execute this function if the new fiber has been already used (not that you will have a different stack so you can't use the old threadData value)
    ::SwitchToFiber(newFiber->handle);

    {
        //You must get the current ThreadData* again, because you come from another fiber (the previous statement is a switch), this fiber could have been used by any other thread
        ThreadData* threadData{ GetThreadData() };

        //THIS ASSERT WILL FIRES IF YOU USE THE FIRST GetThreadData METHOD, WHICH IS IMPOSSIBLE....
        assert(threadData->index == localThreadIndex);

        assert(threadData);
        assert(threadData->previousFiber);

        //We release the previous fiber
        PushFreeFiber(threadData->previousFiber);
        debug_localTheadLastFiber = nullptr;
        threadData->previousFiber = nullptr;
    }

}

void ExecuteThreadBody()
{
    Fiber*  newFiber{};

    if (TryPopFreeFiber(newFiber))
    {
        _SwitchToFiber(newFiber);
    }
}

DWORD __stdcall ThreadFunc(void* data)
{
    int const index{ *static_cast<int*>(data)};

    threadsinfo[index] = std::make_pair(std::this_thread::get_id(), index);

    //setting up the current thread data
    ThreadData* threadData{ &threadsData[index] };
    threadData->index = index;

    void*   threadAsFiber{ ConvertThreadToFiber(nullptr) };
    assert(threadAsFiber);

    threadData->fiber = Fiber{ threadAsFiber };
    threadData->currentFiber = &threadData->fiber;

    localThreadData = threadData;
    localThreadIndex = index;

    while (true)
    {
        ExecuteThreadBody();
    }

    return DWORD{};
}

//The entry point of all fibers
void __stdcall FiberFunc(void* data)
{
    //You enter to the fiber for the first time

    ThreadData* threadData{ GetThreadData() };

    //Making sure that the thread data structure is the good one
    assert(threadData->index == localThreadIndex);

    //Here you will assert
    assert(threadData->previousFiber);

    PushFreeFiber(threadData->previousFiber);
    threadData->previousFiber = nullptr;

    while (true)
    {
        ExecuteThreadBody();
    }
}

__declspec(noinline) void main()
{
    constexpr unsigned int threadCount{ 2 };
    constexpr unsigned int fiberCount{ 20 };

    threadsinfo.resize(threadCount);

    fibers.resize(fiberCount);
    for (auto index = 0; index < fiberCount; ++index)
    {
        fibers[index] = { CreateFiber(0, FiberFunc, nullptr) };
    }

    freeFibers.resize(fiberCount);
    for (auto index = 0; index < fiberCount; ++index)
    {
        freeFibers[index] = std::addressof(fibers[index]);
    }

    threads.resize(threadCount);

    std::vector<int>    threadParamss(threadCount);


    for (auto index = 0; index < threadCount; ++index)
    {
        //threads[index] = new std::thread{ ThreadFunc, index };
        threadParamss[index] = index;
        threads[index] = CreateThread(NULL, 0, &ThreadFunc, &threadParamss[index], 0, NULL);
        assert(threads[index]);
    }

    while (true);

    //I know, it is not clean, it will leak
}
wfsdck30

wfsdck301#

几个月后,我发现声明为 thread_local 的变量是罪魁祸首。如果使用纤程,请忽略 thread_local 变量,并使用创建它们时分配的每纤程内存。现在,我将当前线程索引存储在每纤程结构示例中。

相关问题