我想在多个std::thread
工作线程之间拆分作业,并在它们全部完成后继续。为此,我实现了一个主要基于SO answer的线程池类。然而,我注意到,我的基准测试可能会卡住,永远运行,而不会抛出任何错误。
我写了一个最小的复制代码,附在末尾。根据终端输出,这个问题似乎发生在作业排队的时候。我检查了视频(1,2),文档(3)和博客文章(4)。我尝试使用原子替换锁的类型。我找不到根本原因。
下面是重现这个问题的代码片段,程序重复计算测试向量中的奇数元素。
#include <atomic>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
class Pool {
public:
const int worker_count;
bool to_terminate = false;
std::atomic<int> unfinished_tasks = 0;
std::mutex mutex;
std::condition_variable condition;
std::vector<std::thread> threads;
std::queue<std::function<void()>> jobs;
void thread_loop()
{
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [&] { return (!jobs.empty()) || to_terminate; });
if (to_terminate)
return;
job = jobs.front();
jobs.pop();
}
job();
unfinished_tasks -= 1;
}
}
public:
Pool(int size) : worker_count(size)
{
if (size < 0)
throw std::invalid_argument("Worker count needs to be a positive integer");
for (int i = 0; i < worker_count; ++i)
threads.push_back(std::thread(&Pool::thread_loop, this));
};
~Pool()
{
{
std::unique_lock lock(mutex);
to_terminate = true;
}
condition.notify_all();
for (auto &thread : threads)
thread.join();
threads.clear();
};
void queue_job(const std::function<void()> &job)
{
{
std::unique_lock<std::mutex> lock(mutex);
jobs.push(job);
unfinished_tasks += 1;
// std::cout << unfinished_tasks;
}
condition.notify_one();
}
void wait()
{
while (unfinished_tasks) {
; // spinlock
};
}
};
int main()
{
constexpr int worker_count = 8;
constexpr int vector_size = 1 << 10;
Pool pool = Pool(worker_count);
std::vector<int> test_vector;
test_vector.reserve(vector_size);
for (int i = 0; i < vector_size; ++i)
test_vector.push_back(i);
std::vector<int> worker_odd_counts(worker_count, 0);
std::function<void(int)> worker_task = [&](int thread_id) {
int chunk_size = vector_size / (worker_count) + 1;
int my_start = thread_id * chunk_size;
int my_end = std::min(my_start + chunk_size, vector_size);
int local_odd_count = 0;
for (int ii = my_start; ii < my_end; ++ii)
if (test_vector[ii] % 2 != 0)
++local_odd_count;
worker_odd_counts[thread_id] = local_odd_count;
};
for (int iteration = 0;; ++iteration) {
std::cout << "Jobs.." << std::flush;
for (int i = 0; i < worker_count; ++i)
pool.queue_job([&worker_task, i] { worker_task(i); });
std::cout << "..queued. " << std::flush;
pool.wait();
int odd_count = 0;
for (auto elem : worker_odd_counts)
odd_count += elem;
std::cout << "Iter:" << iteration << ". Odd:" << odd_count << '\n';
}
}
以下是一个特定运行的最终输出:
[...]
Jobs....queued. Iter:2994. Odd:512
Jobs....queued. Iter:2995. Odd:512
Jobs..
编辑:错误发生在使用GCC 12.2.0 x86_64-w 64-mingw 32在Windows 10与AMD锐龙4750 U CPU.我没有得到过去15 k迭代.使用Visual Studio社区2022,我得到了过去1.5M迭代(并停止了它自己).感谢@IgorTandetnik指出后者.
2条答案
按热度按时间m1m5dgzv1#
Mingw本身并不支持Windows上的多线程,他们支持POSIX API上的C标准库中的线程,以及在Windows操作系统线程之上实现该API的
winpthreads
兼容层。我认为你的错误不在C代码中,而是在计算机设置中。请执行以下操作。
1.使用
x86_64-12.2.0-release-posix-seh-ucrt-rt_v10-rev2.7z
归档文件there中的编译器。1.不要忘记,以这种方式构建的二进制文件依赖于编译器提供的一系列DLL文件:
libgcc_s_seh-1.dll
、libwinpthread-1.dll
和libstdc++-6.dll
。您必须使用与mingw附带的这些DLL完全相同的版本。如果您在%PATH%
中的任何地方有这些DLL的其他版本,则会出现各种失败。几个要点。
Linux优先的C编译器,如gcc,在Windows上有问题。阻力最小的方法是使用Visual C。如果你想让你的软件也在其他平台上构建,考虑cmake来抽象编译器。
Windows已经包含了线程池的实现,因为Vista. API很容易使用,你只需要4个函数:创建线程池工作、提交线程池工作、等待线程池工作回调和关闭线程池工作。Example。
qojgxg4l2#
你应该做的第一件事是从线程池中分离队列。它们都很棘手,把它们混合在一个类中编写是在自找麻烦。
这还允许您在没有池的情况下对队列进行单元测试。
这比线程池更容易写。
好了。
现在我们的线程池更简单了。
我不能在你的代码中发现错误,但是有了上面的代码,你可以测试队列从池中的分离。
队列将使用pthreads或其他原语。