多线程C++程序的意外输出

0vvn1miw  于 2023-02-14  发布在  其他
关注(0)|答案(1)|浏览(145)

我正在研究C++中的并发性,并且我正在尝试实现一个多线程回调注册系统。我产生了下面的代码,它应该接受注册请求,直到一个事件发生。在那之后,它应该按照它们被注册的顺序执行所有注册的回调。注册顺序不必是确定的。代码没有按照预期工作。首先,它很少输出“Pushing callback with id”消息。其次,它有时候会挂起(我认为是由争用条件引起的死锁)。我希望能得到帮助,以弄清楚这里发生了什么。如果您发现我使代码的某些部分过于复杂或误用了某些部分,也请指出这一点。

#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

class CallbackRegistrar{
public:
    void registerCallbackAndExecute(std::function<void()> callback) {
        if (!eventTriggered) {
            std::unique_lock<std::mutex> lock(callbackMutex);
            auto saved_id = callback_id;
            std::cout << "Pushing callback with id " << saved_id << std::endl;
            registeredCallbacks.push(std::make_pair(callback_id, callback));
            ++callback_id;
            callbackCond.wait(lock, [this, saved_id]{return releasedCallback.first == saved_id;});
            releasedCallback.second();
            callbackExecuted = true;
            eventCond.notify_one();
        }
        else {
            callback();
        }
    }
    void registerEvent() {
        eventTriggered = true;
        while (!registeredCallbacks.empty()) {
            releasedCallback = registeredCallbacks.front();
            callbackCond.notify_all();
            std::unique_lock<std::mutex> lock(eventMutex);
            eventCond.wait(lock, [this]{return callbackExecuted;});
            callbackExecuted = false;
            registeredCallbacks.pop();
        }
    }
private:
    std::queue<std::pair<unsigned, std::function<void()>>> registeredCallbacks;
    bool eventTriggered{false};
    bool callbackExecuted{false};
    std::mutex callbackMutex;
    std::mutex eventMutex;
    std::condition_variable callbackCond;
    std::condition_variable eventCond;
    unsigned callback_id{1};
    std::pair<unsigned, std::function<void()>> releasedCallback;
};

int main()
{
    CallbackRegistrar registrar;
    std::thread t1(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "First!\n";});
    std::thread t2(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "Second!\n";});
    
    registrar.registerEvent();
    
    t1.join();
    t2.join();

    return 0;
}
hwamh0ep

hwamh0ep1#

除了评论中的优秀建议之外,我在您的代码中发现的主要问题是您设置的callbackCond条件变量wait condition。如果releasedCallback.first不等于savedId,会发生什么情况?
当我运行你的代码时(使用线程安全队列和eventTriggered作为原子),我发现问题出在这个等待函数中,如果你在那个函数中放入一个print语句,你会发现你得到了这样的结果:

releasedCallback.first: 0, savedId: 1

然后永远等待。
事实上,我发现你的代码中使用的条件变量实际上并不需要,你只需要一个,并且它可以存在于线程安全队列中,你在搜索之后将构建这个队列;)
拥有线程安全队列之后,上面的代码可以简化为:

class CallbackRegistrar{
public:
  using NumberedCallback = std::pair<unsigned int, std::function<void()>>;

  void postCallback(std::function<void()> callback) {

    if (!eventTriggered)
    {
      std::unique_lock<std::mutex> lock(mutex);
      auto saved_id = callback_id;
      std::cout << "Pushing callback with id " << saved_id << std::endl;
      registeredCallbacks.push(std::make_pair(callback_id, callback));
      ++callback_id;
    }
    else
    {
      while (!registeredCallbacks.empty())
      {
        NumberedCallback releasedCallback;
        registeredCallbacks.waitAndPop(releasedCallback);
        releasedCallback.second();
      }
      callback();
    }
  }
  void registerEvent() {
    eventTriggered = true;
  }
private:
  ThreadSafeQueue<NumberedCallback> registeredCallbacks;
  std::atomic<bool> eventTriggered{false};
  std::mutex mutex;
  unsigned int callback_id{1};
};

int main()
{
  CallbackRegistrar registrar;
  std::vector<std::thread> threads;

  for (int i = 0; i < 10; i++)
  {
    threads.push_back(std::thread(&CallbackRegistrar::postCallback, 
                                  std::ref(registrar), 
                                  [i]{std::cout << std::to_string(i) <<"\n";}
                                  ));
  }

  registrar.registerEvent();

  for (auto& thread : threads)
  {
    thread.join();
  }

  return 0;
}

我不确定这是否完全符合您的要求,但它不会死锁。无论如何,这是一个很好的起点,但您需要自带ThreadSafeQueue的实现。

相关问题