c++ boost asio异步等待条件变量

whlutmcx  于 2023-01-22  发布在  其他
关注(0)|答案(4)|浏览(148)

是否可以执行异步等待(读取:non-blocking),如果不直接支持,请提供有关实现的任何提示。
我可以实现一个计时器,甚至每隔几毫秒就启动一次唤醒,但这种方法是非常低劣的,我发现很难相信条件变量同步没有实现/记录。

wqsoz72f

wqsoz72f1#

如果我理解正确的话,你想在asio线程池的上下文中,当某个条件变量被发信号时,启动一个事件处理程序吗?我认为在处理程序的开始等待条件变量就足够了,并且io_service::post()本身在最后回到池中,类似这样的事情:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()
{
    boost::unique_lock<boost::mutex> lk(mx);
         cv.wait(lk);
    std::cout << "handler awakened\n";
    io.post(handler);
}
void buzzer()
{
    for(;;)
    {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
        boost::lock_guard<boost::mutex> lk(mx);
            cv.notify_all();
    }
}
int main()
{
    io.post(handler);
    boost::thread bt(buzzer);
    io.run();
}
oipij1gg

oipij1gg2#

我可以建议基于boost::asio::deadline_timer的解决方案,对我来说效果很好。这是boost::asio环境中的一种异步事件。一个非常重要的事情是,“handler”必须通过与“cancel”相同的“strand_”序列化,因为从多个线程使用“boost::asio::deadline_timer”不是线程安全的。

class async_event
{
public:
    async_event(
        boost::asio::io_service& io_service,
        boost::asio::strand<boost::asio::io_context::executor_type>& strand)
            : strand_(strand)
            , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
    {}

    // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
    //  because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
    template<class WaitHandler>
    void async_wait(WaitHandler&& handler) {
        deadline_timer_.async_wait(handler);
    }
    void async_notify_one() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
    }
    void async_notify_all() {
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
    }
private:
    void async_notify_one_serialized() {
        deadline_timer_.cancel_one();
    }
    void async_notify_all_serialized() {
        deadline_timer_.cancel();
    }
    boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
    boost::asio::deadline_timer deadline_timer_;
};
syqv5f0l

syqv5f0l3#

不幸的是,Boost ASIO没有async_wait_for_condvar()方法。
在大多数情况下,你也不需要它。用ASIO的方式编程通常意味着,你使用strand,而不是互斥锁或条件变量来保护共享资源。除了极少数情况,通常集中在启动和退出时正确的构造或破坏顺序,你根本不需要互斥锁或条件变量。
修改共享资源时,经典的部分同步线程化方式如下所示:

  • 锁定保护资源的互斥锁
  • 更新任何需要更新的内容
  • 如果等待线程需要进一步处理,则向条件变量发送信号
  • 解锁互斥锁

完全异步ASIO方式如下:

  • 生成包含更新资源所需的所有内容的消息
  • 将对更新处理程序的调用与该消息一起发布到资源的链
  • 如果需要进一步的处理,让更新处理程序创建更多的消息,并将它们发布到适当的资源链。
  • 如果作业可以在完全私有的数据上执行,则直接将它们发布到io-context。

下面是类some_shared_resource的一个例子,它接收字符串state,并根据接收到的状态触发一些进一步的处理。请注意,私有方法some_shared_resource::receive_state()中的所有处理都是完全线程安全的,因为strand序列化了所有调用。
当然,这个例子并不完整;some_other_resource需要与some_shared_ressource::send_state()类似的send_code_red()方法。

#include <boost/asio>
#include <memory>

using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;

class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> {
    asio_strand strand;
    std::shared_ptr<some_other_resource> other;
    std::string state;

    void receive_state(std::string&& new_state) {
        std::string oldstate = std::exchange(state, new_state);
        if(state == "red" && oldstate != "red") {
            // state transition to "red":
            other.send_code_red(true);
        } else if(state != "red" && oldstate == "red") {
            // state transition from "red":
            other.send_code_red(false);
        }
    }

public:
    some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
      : strand(ctx.get_executor()), other(other) {}

    void send_state(std::string&& new_state) {
        boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable {
            if(auto self = me.lock(); self) {
                self->receive_state(std::move(new_state));
            }
        });
    }
};

正如您所看到的,总是将代码发布到ASIO的strand中一开始可能有点乏味,但是您可以将大部分“用strand来装备类”的代码移到模板中。
消息传递的好处是:因为你没有使用互斥锁,所以即使在极端的情况下,你也不能再死锁自己了。而且,使用消息传递,通常比传统的多线程更容易创建高级别的并行性。缺点是,移动和复制所有这些消息对象是很耗时的,这会降低你的应用程序。
最后一点:在由send_state()形成的消息中使用弱指针便于可靠地销毁some_shared_resource对象:否则,如果A调用B,B调用C,C调用A(可能只有在超时或类似的情况下),在消息中使用共享指针而不是弱指针将创建循环引用,这将防止对象销毁。如果您确信永远不会有循环,并且处理来自要删除对象的消息不会造成问题,当然,您可以使用shared_from_this()来代替weak_from_this(),如果您确信在ASIO停止(并且所有工作线程都联接回主线程)之前对象不会被删除,那么您也可以直接捕获this指针。

y4ekin9u

y4ekin9u4#

首先,我使用相当好的continuable库实现了一个异步互斥锁:

class async_mutex
{
    cti::continuable<> tail_{cti::make_ready_continuable()};
    std::mutex mutex_;

public:
    async_mutex() = default;
    async_mutex(const async_mutex&) = delete;
    const async_mutex& operator=(const async_mutex&) = delete;

    [[nodiscard]] cti::continuable<std::shared_ptr<int>> lock()
    {
        std::shared_ptr<int> result;
        cti::continuable<> tail = cti::make_continuable<void>(
            [&result](auto&& promise) {
                result = std::shared_ptr<int>((int*)1,
                    [promise = std::move(promise)](auto) mutable {
                        promise.set_value();
                    }
                );
            }
        );

        {
            std::lock_guard _{mutex_};
            std::swap(tail, tail_);
        }
        co_await std::move(tail);
        co_return result;
    }
};

用法如:

async_mutex mutex;

...

{
    const auto _ = co_await mutex.lock();
    // only one lock per mutex-instance
}

相关问题