c++ 具有作业队列的线程池阻塞

mxg2im7a  于 2023-02-01  发布在  其他
关注(0)|答案(2)|浏览(156)

我想在多个std::thread工作线程之间拆分作业,并在它们全部完成后继续。为此,我实现了一个主要基于SO answer的线程池类。然而,我注意到,我的基准测试可能会卡住,永远运行,而不会抛出任何错误。
我写了一个最小的复制代码,附在末尾。根据终端输出,这个问题似乎发生在作业排队的时候。我检查了视频(12),文档(3)和博客文章(4)。我尝试使用原子替换锁的类型。我找不到根本原因。
下面是重现这个问题的代码片段,程序重复计算测试向量中的奇数元素。

#include <atomic>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

class Pool {
  public:
    const int worker_count;
    bool to_terminate = false;
    std::atomic<int> unfinished_tasks = 0;
    std::mutex mutex;
    std::condition_variable condition;
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> jobs;

    void thread_loop()
    {
        while (true) {
            std::function<void()> job;
            {
                std::unique_lock<std::mutex> lock(mutex);
                condition.wait(lock, [&] { return (!jobs.empty()) || to_terminate; });

                if (to_terminate)
                    return;

                job = jobs.front();
                jobs.pop();
            }
            job();
            unfinished_tasks -= 1;
        }
    }

  public:
    Pool(int size) : worker_count(size)
    {
        if (size < 0)
            throw std::invalid_argument("Worker count needs to be a positive integer");

        for (int i = 0; i < worker_count; ++i)
            threads.push_back(std::thread(&Pool::thread_loop, this));
    };

    ~Pool()
    {
        {
            std::unique_lock lock(mutex);
            to_terminate = true;
        }
        condition.notify_all();
        for (auto &thread : threads)
            thread.join();
        threads.clear();
    };

    void queue_job(const std::function<void()> &job)
    {
        {
            std::unique_lock<std::mutex> lock(mutex);
            jobs.push(job);
            unfinished_tasks += 1;
            // std::cout << unfinished_tasks;
        }
        condition.notify_one();
    }

    void wait()
    {
        while (unfinished_tasks) {
            ; // spinlock
        };
    }
};

int main()
{
    constexpr int worker_count = 8;
    constexpr int vector_size = 1 << 10;
    Pool pool = Pool(worker_count);

    std::vector<int> test_vector;
    test_vector.reserve(vector_size);
    for (int i = 0; i < vector_size; ++i)
        test_vector.push_back(i);

    std::vector<int> worker_odd_counts(worker_count, 0);

    std::function<void(int)> worker_task = [&](int thread_id) {
        int chunk_size = vector_size / (worker_count) + 1;
        int my_start = thread_id * chunk_size;
        int my_end = std::min(my_start + chunk_size, vector_size);

        int local_odd_count = 0;
        for (int ii = my_start; ii < my_end; ++ii)
            if (test_vector[ii] % 2 != 0)
                ++local_odd_count;

        worker_odd_counts[thread_id] = local_odd_count;
    };

    for (int iteration = 0;; ++iteration) {
        std::cout << "Jobs.." << std::flush;
        for (int i = 0; i < worker_count; ++i)
            pool.queue_job([&worker_task, i] { worker_task(i); });
        std::cout << "..queued. " << std::flush;

        pool.wait();

        int odd_count = 0;
        for (auto elem : worker_odd_counts)
            odd_count += elem;

        std::cout << "Iter:" << iteration << ". Odd:" << odd_count << '\n';
    }
}

以下是一个特定运行的最终输出:

[...]
Jobs....queued. Iter:2994. Odd:512
Jobs....queued. Iter:2995. Odd:512
Jobs..

编辑:错误发生在使用GCC 12.2.0 x86_64-w 64-mingw 32在Windows 10与AMD锐龙4750 U CPU.我没有得到过去15 k迭代.使用Visual Studio社区2022,我得到了过去1.5M迭代(并停止了它自己).感谢@IgorTandetnik指出后者.

m1m5dgzv

m1m5dgzv1#

Mingw本身并不支持Windows上的多线程,他们支持POSIX API上的C标准库中的线程,以及在Windows操作系统线程之上实现该API的winpthreads兼容层。
我认为你的错误不在C
代码中,而是在计算机设置中。请执行以下操作。
1.使用x86_64-12.2.0-release-posix-seh-ucrt-rt_v10-rev2.7z归档文件there中的编译器。
1.不要忘记,以这种方式构建的二进制文件依赖于编译器提供的一系列DLL文件:libgcc_s_seh-1.dlllibwinpthread-1.dlllibstdc++-6.dll。您必须使用与mingw附带的这些DLL完全相同的版本。如果您在%PATH%中的任何地方有这些DLL的其他版本,则会出现各种失败。
几个要点。
Linux优先的C编译器,如gcc,在Windows上有问题。阻力最小的方法是使用Visual C。如果你想让你的软件也在其他平台上构建,考虑cmake来抽象编译器。
Windows已经包含了线程池的实现,因为Vista. API很容易使用,你只需要4个函数:创建线程池工作、提交线程池工作、等待线程池工作回调和关闭线程池工作。Example

qojgxg4l

qojgxg4l2#

你应该做的第一件事是从线程池中分离队列。它们都很棘手,把它们混合在一个类中编写是在自找麻烦。
这还允许您在没有池的情况下对队列进行单元测试。

template<class Payload>
class MutexQueue {
public:
  std::optional<Payload> wait_and_pop();
  void push(Payload);
  void terminate_queue();
  bool queue_is_terminated() const;
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::deque<Payload> q;
  bool terminated = false;
  std::unique_lock<std::mutex> lock() const {
    return std::unique_lock<std::mutex>(m);
  }
};

这比线程池更容易写。

void push(Payload p) {
     {
       auto l = lock();
       if (terminate) return;
       q.push_back(std::move(p));
     }
     cv.notify_one();
   }
  void terminate_queue() {
     {
       auto l = lock(); // YOU CANNOT SKIP THIS LOCK, even if terminate is atomic
       terminate = true;
       q.clear();
     }
     cv.notify_all();
  }
  bool queue_is_terminated() const {
    auto l = lock(); // if you make terminate atomic, you CAN skip this lock
    return terminate;
  }
  std::optional<Payload> wait_and_pop() {
    auto l = lock();
    cv.wait(l, [&]{ return terminate || !q.empty(); }
    if (terminate) return std::nullopt;
    auto r = std::move(q.front());
    q.pop_front();
    return std::move(r);
  }

好了。
现在我们的线程池更简单了。

struct ThreadPool {
  explicit ThreadPool(std::size_t n) {
    create_threads(n);
  }
  std::future<void> push_task(std::function<void()> f) {
    std::packaged_task<void()> p = std::move(f);
    auto r = p.get_future();
    q.push( std::move(p) );
    return r;
  }
  void terminate_pool() {
    q.terminate_queue();
    terminate_threads();
  }
  ~ThreadPool() {
    terminate_pool();
  }
private:
  MutexQueue<std::packaged_task<void()>> q;
  std::vector<std::thread> threads;
  void terminate_threads() {
    for(auto& thread:threads)
      thread.join();
    threads.clear();
  }
  static void thread_task( MutexQueue<std::packaged_task<void()>>* pq ) {
    if (!pq) return;
    while (auto task = pq->wait_and_pop()) {
      (*task)();
    }
  }
  void create_threads(std::size_t n) {
    for (std::size_t i = 0; i < n; ++i) {
      threads.push_back( std::thread( thread_task, &q ) );
    }
  }

我不能在你的代码中发现错误,但是有了上面的代码,你可以测试队列从池中的分离。
队列将使用pthreads或其他原语。

相关问题