我在windows fibers上实现了我自己的任务调度器,但出现了一些奇怪的崩溃和未定义的行为。为了简单起见,我启动了一个新项目,并编写了一个简单的程序,执行以下操作:
1.主线程创建一束纤维,然后启动两个线程
1.主线程会一直等待,直到您终止程序
1.每个工作线程将自己转换为一个纤程
1.每个工作线程尝试找到一个空闲纤程,然后切换到这个新的空闲纤程
1.一旦线程切换到新纤程,它就会将以前的纤程推入空闲纤程容器
1.每个工作线程都会转到步骤4
如果您不熟悉光纤概念this talk is a good start。
"数据“
每个线程都有自己的 ThreadData 数据结构来存储它以前的、当前的纤程示例,以及它的线程索引。我尝试了几种方法来在执行期间检索 ThreadData 数据结构:
- 我使用线程本地存储来存储ThreadData指针
- 我使用了一个将thread_id与ThreadData结构相关联的容器
"问题"
首次进入光纤时(请看 FiberFunc 函数),使用此纤程的线程必须将其 previous fiber 推入空闲纤程容器。但有时 previous fiber 为空,这是不可能的。这是不可能的,因为在切换到新纤程之前,线程将其 previous fiber 值设置为 current fiber 值(并且它将其 * 当前光纤 * 值设置为新的光纤值)。
因此,如果一个线程进入一个全新的纤程,而它的 previous fiber 设置为null,这意味着它来自任何地方(这没有任何意义)。
- ThreadData* 在进入全新纤程时将其 previous fiber 值设置为null的唯一原因是另一个线程将其设置为null,或者编译器在幕后重新排序了指令。
我检查了程序集,似乎编译器不负责。
有几个bug我无法解释:
1.如果我使用第一个 GetThreadData() 函数来检索ThreadData结构,我可以检索到一个示例,它的索引与线程本地索引不同(这些索引在线程启动时已经设置),这将使程序Assert(assert(threadData-〉index == localThreadIndex))。
1.如果我使用任何其他函数来检索ThreadData结构,我将在 FiberFunc 函数中Assert,因为 previous fiber 值为空(assert(threadData-〉previousFiber))。
你知道为什么这个代码不工作吗?我花了无数个小时试图找出错误,但我没有看到我的错误。
规格
操作系统:Windows 10
集成开发环境:Visual Studio 2015和Visual Studio 2017
编译器:VC++
配置方式:放行
请注意,调试配置中没有错误。
"法典"
在引发Assert之前,您可以尝试运行它几次。
#include "Windows.h"
#include <vector>
#include <thread>
#include <mutex>
#include <cassert>
#include <iostream>
#include <atomic>
struct Fiber
{
void* handle;
};
struct ThreadData
{
Fiber* previousFiber{ nullptr };
Fiber* currentFiber{ nullptr };
Fiber fiber{ };
unsigned int index{};
};
//Threads
std::vector<std::pair<std::thread::id, unsigned int>> threadsinfo{};
//threads data container
ThreadData threadsData[8];
//Fibers
std::mutex fibersLock{};
std::vector<Fiber> fibers{};
std::vector<Fiber*> freeFibers{};
thread_local unsigned int localThreadIndex{};
thread_local Fiber* debug_localTheadLastFiber{};
thread_local ThreadData* localThreadData{};
using WindowsThread = HANDLE;
std::vector<WindowsThread> threads{};
//This is the first way to retrieve the current thread's ThreadData structure using thread_id
//ThreadData* GetThreadData()
//{
// std::thread::id threadId( std::this_thread::get_id());
// for (auto const& pair : threadsinfo)
// {
// if (pair.first == threadId)
// {
// return &threadsData[pair.second];
// }
// }
//
// //It is not possible to assert
// assert(false);
// return nullptr;
//}
//This is the second way to retrieve the current thread's ThreadData structure using thread local storage
//ThreadData* GetThreadData()
//{
// return &threadsData[localThreadIndex];
//}
//This is the third way to retrieve the current thread's ThreadData structure using thread local storage
ThreadData* GetThreadData()
{
return localThreadData;
}
//Try to pop a free fiber from the container, thread safe due to mutex usage
bool TryPopFreeFiber(Fiber*& fiber)
{
std::lock_guard<std::mutex> guard(fibersLock);
if (freeFibers.empty()) { return false; }
fiber = freeFibers.back();
assert(fiber);
assert(fiber->handle);
freeFibers.pop_back();
return true;
}
//Try to push a free fiber to the container, thread safe due to mutex usage
bool PushFreeFiber(Fiber* fiber)
{
std::lock_guard<std::mutex> guard(fibersLock);
freeFibers.push_back(fiber);
return true;
}
//the __declspec(noinline) is used to inspect code in release mode, comment it if you want
__declspec(noinline) void _SwitchToFiber(Fiber* newFiber)
{
//You want to switch to another fiber
//You first have to save your current fiber instance to release it once you will be in the new fiber
{
ThreadData* threadData{ GetThreadData() };
assert(threadData->index == localThreadIndex);
assert(threadData->currentFiber);
threadData->previousFiber = threadData->currentFiber;
threadData->currentFiber = newFiber;
debug_localTheadLastFiber = threadData->previousFiber;
assert(threadData->previousFiber);
assert(newFiber);
assert(newFiber->handle);
}
//You switch to the new fiber
//this call will either make you enter in the FiberFunc function if the fiber has never been used
//Or you will continue to execute this function if the new fiber has been already used (not that you will have a different stack so you can't use the old threadData value)
::SwitchToFiber(newFiber->handle);
{
//You must get the current ThreadData* again, because you come from another fiber (the previous statement is a switch), this fiber could have been used by any other thread
ThreadData* threadData{ GetThreadData() };
//THIS ASSERT WILL FIRES IF YOU USE THE FIRST GetThreadData METHOD, WHICH IS IMPOSSIBLE....
assert(threadData->index == localThreadIndex);
assert(threadData);
assert(threadData->previousFiber);
//We release the previous fiber
PushFreeFiber(threadData->previousFiber);
debug_localTheadLastFiber = nullptr;
threadData->previousFiber = nullptr;
}
}
void ExecuteThreadBody()
{
Fiber* newFiber{};
if (TryPopFreeFiber(newFiber))
{
_SwitchToFiber(newFiber);
}
}
DWORD __stdcall ThreadFunc(void* data)
{
int const index{ *static_cast<int*>(data)};
threadsinfo[index] = std::make_pair(std::this_thread::get_id(), index);
//setting up the current thread data
ThreadData* threadData{ &threadsData[index] };
threadData->index = index;
void* threadAsFiber{ ConvertThreadToFiber(nullptr) };
assert(threadAsFiber);
threadData->fiber = Fiber{ threadAsFiber };
threadData->currentFiber = &threadData->fiber;
localThreadData = threadData;
localThreadIndex = index;
while (true)
{
ExecuteThreadBody();
}
return DWORD{};
}
//The entry point of all fibers
void __stdcall FiberFunc(void* data)
{
//You enter to the fiber for the first time
ThreadData* threadData{ GetThreadData() };
//Making sure that the thread data structure is the good one
assert(threadData->index == localThreadIndex);
//Here you will assert
assert(threadData->previousFiber);
PushFreeFiber(threadData->previousFiber);
threadData->previousFiber = nullptr;
while (true)
{
ExecuteThreadBody();
}
}
__declspec(noinline) void main()
{
constexpr unsigned int threadCount{ 2 };
constexpr unsigned int fiberCount{ 20 };
threadsinfo.resize(threadCount);
fibers.resize(fiberCount);
for (auto index = 0; index < fiberCount; ++index)
{
fibers[index] = { CreateFiber(0, FiberFunc, nullptr) };
}
freeFibers.resize(fiberCount);
for (auto index = 0; index < fiberCount; ++index)
{
freeFibers[index] = std::addressof(fibers[index]);
}
threads.resize(threadCount);
std::vector<int> threadParamss(threadCount);
for (auto index = 0; index < threadCount; ++index)
{
//threads[index] = new std::thread{ ThreadFunc, index };
threadParamss[index] = index;
threads[index] = CreateThread(NULL, 0, &ThreadFunc, &threadParamss[index], 0, NULL);
assert(threads[index]);
}
while (true);
//I know, it is not clean, it will leak
}
2条答案
按热度按时间wfsdck301#
几个月后,我发现声明为 thread_local 的变量是罪魁祸首。如果使用纤程,请忽略 thread_local 变量,并使用创建它们时分配的每纤程内存。现在,我将当前线程索引存储在每纤程结构示例中。
wko9yo5t2#
如果您想要执行绪本机储存区,就必须使用/GT选项。
https://learn.microsoft.com/en-us/cpp/build/reference/gt-support-fiber-safe-thread-local-storage?view=msvc-170