是否可以执行异步等待(读取:non-blocking),如果不直接支持,请提供有关实现的任何提示。我可以实现一个计时器,甚至每隔几毫秒就启动一次唤醒,但这种方法是非常低劣的,我发现很难相信条件变量同步没有实现/记录。
wqsoz72f1#
如果我理解正确的话,你想在asio线程池的上下文中,当某个条件变量被发信号时,启动一个事件处理程序吗?我认为在处理程序的开始等待条件变量就足够了,并且io_service::post()本身在最后回到池中,类似这样的事情:
#include <iostream> #include <boost/asio.hpp> #include <boost/thread.hpp> boost::asio::io_service io; boost::mutex mx; boost::condition_variable cv; void handler() { boost::unique_lock<boost::mutex> lk(mx); cv.wait(lk); std::cout << "handler awakened\n"; io.post(handler); } void buzzer() { for(;;) { boost::this_thread::sleep(boost::posix_time::seconds(1)); boost::lock_guard<boost::mutex> lk(mx); cv.notify_all(); } } int main() { io.post(handler); boost::thread bt(buzzer); io.run(); }
oipij1gg2#
我可以建议基于boost::asio::deadline_timer的解决方案,对我来说效果很好。这是boost::asio环境中的一种异步事件。一个非常重要的事情是,“handler”必须通过与“cancel”相同的“strand_”序列化,因为从多个线程使用“boost::asio::deadline_timer”不是线程安全的。
class async_event { public: async_event( boost::asio::io_service& io_service, boost::asio::strand<boost::asio::io_context::executor_type>& strand) : strand_(strand) , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin)) {} // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one' // because using 'boost::asio::deadline_timer' from multiple threads is not thread safe template<class WaitHandler> void async_wait(WaitHandler&& handler) { deadline_timer_.async_wait(handler); } void async_notify_one() { boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this)); } void async_notify_all() { boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this)); } private: void async_notify_one_serialized() { deadline_timer_.cancel_one(); } void async_notify_all_serialized() { deadline_timer_.cancel(); } boost::asio::strand<boost::asio::io_context::executor_type>& strand_; boost::asio::deadline_timer deadline_timer_; };
syqv5f0l3#
不幸的是,Boost ASIO没有async_wait_for_condvar()方法。在大多数情况下,你也不需要它。用ASIO的方式编程通常意味着,你使用strand,而不是互斥锁或条件变量来保护共享资源。除了极少数情况,通常集中在启动和退出时正确的构造或破坏顺序,你根本不需要互斥锁或条件变量。修改共享资源时,经典的部分同步线程化方式如下所示:
async_wait_for_condvar()
完全异步ASIO方式如下:
下面是类some_shared_resource的一个例子,它接收字符串state,并根据接收到的状态触发一些进一步的处理。请注意,私有方法some_shared_resource::receive_state()中的所有处理都是完全线程安全的,因为strand序列化了所有调用。当然,这个例子并不完整;some_other_resource需要与some_shared_ressource::send_state()类似的send_code_red()方法。
some_shared_resource
state
some_shared_resource::receive_state()
some_other_resource
some_shared_ressource::send_state()
send_code_red()
#include <boost/asio> #include <memory> using asio_context = boost::asio::io_context; using asio_executor_type = asio_context::executor_type; using asio_strand = boost::asio::strand<asio_executor_type>; class some_other_resource; class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> { asio_strand strand; std::shared_ptr<some_other_resource> other; std::string state; void receive_state(std::string&& new_state) { std::string oldstate = std::exchange(state, new_state); if(state == "red" && oldstate != "red") { // state transition to "red": other.send_code_red(true); } else if(state != "red" && oldstate == "red") { // state transition from "red": other.send_code_red(false); } } public: some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other) : strand(ctx.get_executor()), other(other) {} void send_state(std::string&& new_state) { boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable { if(auto self = me.lock(); self) { self->receive_state(std::move(new_state)); } }); } };
正如您所看到的,总是将代码发布到ASIO的strand中一开始可能有点乏味,但是您可以将大部分“用strand来装备类”的代码移到模板中。消息传递的好处是:因为你没有使用互斥锁,所以即使在极端的情况下,你也不能再死锁自己了。而且,使用消息传递,通常比传统的多线程更容易创建高级别的并行性。缺点是,移动和复制所有这些消息对象是很耗时的,这会降低你的应用程序。最后一点:在由send_state()形成的消息中使用弱指针便于可靠地销毁some_shared_resource对象:否则,如果A调用B,B调用C,C调用A(可能只有在超时或类似的情况下),在消息中使用共享指针而不是弱指针将创建循环引用,这将防止对象销毁。如果您确信永远不会有循环,并且处理来自要删除对象的消息不会造成问题,当然,您可以使用shared_from_this()来代替weak_from_this(),如果您确信在ASIO停止(并且所有工作线程都联接回主线程)之前对象不会被删除,那么您也可以直接捕获this指针。
send_state()
shared_from_this()
weak_from_this()
this
y4ekin9u4#
首先,我使用相当好的continuable库实现了一个异步互斥锁:
class async_mutex { cti::continuable<> tail_{cti::make_ready_continuable()}; std::mutex mutex_; public: async_mutex() = default; async_mutex(const async_mutex&) = delete; const async_mutex& operator=(const async_mutex&) = delete; [[nodiscard]] cti::continuable<std::shared_ptr<int>> lock() { std::shared_ptr<int> result; cti::continuable<> tail = cti::make_continuable<void>( [&result](auto&& promise) { result = std::shared_ptr<int>((int*)1, [promise = std::move(promise)](auto) mutable { promise.set_value(); } ); } ); { std::lock_guard _{mutex_}; std::swap(tail, tail_); } co_await std::move(tail); co_return result; } };
用法如:
async_mutex mutex; ... { const auto _ = co_await mutex.lock(); // only one lock per mutex-instance }
4条答案
按热度按时间wqsoz72f1#
如果我理解正确的话,你想在asio线程池的上下文中,当某个条件变量被发信号时,启动一个事件处理程序吗?我认为在处理程序的开始等待条件变量就足够了,并且io_service::post()本身在最后回到池中,类似这样的事情:
oipij1gg2#
我可以建议基于boost::asio::deadline_timer的解决方案,对我来说效果很好。这是boost::asio环境中的一种异步事件。一个非常重要的事情是,“handler”必须通过与“cancel”相同的“strand_”序列化,因为从多个线程使用“boost::asio::deadline_timer”不是线程安全的。
syqv5f0l3#
不幸的是,Boost ASIO没有
async_wait_for_condvar()
方法。在大多数情况下,你也不需要它。用ASIO的方式编程通常意味着,你使用strand,而不是互斥锁或条件变量来保护共享资源。除了极少数情况,通常集中在启动和退出时正确的构造或破坏顺序,你根本不需要互斥锁或条件变量。
修改共享资源时,经典的部分同步线程化方式如下所示:
完全异步ASIO方式如下:
下面是类
some_shared_resource
的一个例子,它接收字符串state
,并根据接收到的状态触发一些进一步的处理。请注意,私有方法some_shared_resource::receive_state()
中的所有处理都是完全线程安全的,因为strand序列化了所有调用。当然,这个例子并不完整;
some_other_resource
需要与some_shared_ressource::send_state()
类似的send_code_red()
方法。正如您所看到的,总是将代码发布到ASIO的strand中一开始可能有点乏味,但是您可以将大部分“用strand来装备类”的代码移到模板中。
消息传递的好处是:因为你没有使用互斥锁,所以即使在极端的情况下,你也不能再死锁自己了。而且,使用消息传递,通常比传统的多线程更容易创建高级别的并行性。缺点是,移动和复制所有这些消息对象是很耗时的,这会降低你的应用程序。
最后一点:在由
send_state()
形成的消息中使用弱指针便于可靠地销毁some_shared_resource
对象:否则,如果A调用B,B调用C,C调用A(可能只有在超时或类似的情况下),在消息中使用共享指针而不是弱指针将创建循环引用,这将防止对象销毁。如果您确信永远不会有循环,并且处理来自要删除对象的消息不会造成问题,当然,您可以使用shared_from_this()
来代替weak_from_this()
,如果您确信在ASIO停止(并且所有工作线程都联接回主线程)之前对象不会被删除,那么您也可以直接捕获this
指针。y4ekin9u4#
首先,我使用相当好的continuable库实现了一个异步互斥锁:
用法如: