c++ 助推Asio:落实“事件”?

sgtfey8w  于 2023-05-20  发布在  其他
关注(0)|答案(2)|浏览(116)

使用Boost Asio,我如何实现一个“事件”类来恢复C++20协程?

就像这样:

// Oversimplicated, but hopefully good enough as example
struct oneshot_event {
    void raise();
    async::awaitable<void> wait();
};

其中,wait()可以被co_await艾德,只要有东西调用raise(),协程就会恢复。
我知道Boost Asio已经(或者曾经?我是在过时的文档中找到的)一个coro类,它可以产生,但这对我没有帮助,因为有多个事件可以等待。我也不确定所有这些Boost.asioBoost.coroutine是如何一起工作的
土溪畈

6qftjkof

6qftjkof1#

传统的答案会使用一个可等待的计时器(gera发布了一个答案)。
然而,这可能容易出错并且笨拙,特别是对于多线程(两个竞争条件(例如https://stackoverflow.com/a/22204127/85371,更具体地说是https://stackoverflow.com/a/43169596/85371)和数据竞争(例如,gera的代码不是线程安全的)。
更灵活的是实验通道支持:https://www.boost.org/doc/libs/master/doc/html/boost_asio/overview/channels.html
这将允许您为多个事件使用多个通道,或通过单个通道支持许多事件。通道存在于开箱即用的线程安全变体(asio::concurrent_channel)中。

频道Demo

根据评论请求,以下是基于渠道的示例:

#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <iostream>
#include <memory>

namespace asio = boost::asio;
using boost::system::error_code;
using Channel = asio::experimental::concurrent_channel<void(error_code, std::size_t)>;
using CoChannel = asio::use_awaitable_t<>::as_default_on_t<Channel>;

asio::awaitable<void> myCoroutine(CoChannel& event) {
    try {
        std::cout << "Coroutine: waiting..." << std::endl;
        while (true) {
            auto evt = co_await event.async_receive();
            std::cout << "Coroutine: resumed! (" << evt << ")" << std::endl;
        }
    } catch (boost::system::system_error const& e) {
        std::cout << "Coroutine: " << e.code().message() << std::endl;
    }
    std::cout << "Coroutine: done" << std::endl;
}

int main() {
    asio::thread_pool io;

    CoChannel event(io);
    co_spawn(io, myCoroutine(event), asio::detached);

    for (auto i : {1, 2, 3}) {
        std::cout << "Main: Sleeping for " << i << " seconds..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(i));

        event.try_send(error_code{}, i);
        ; // Resume the coroutine
    }

    event.close();
    io.join();
    std::cout << "Main: done" << std::endl;
}

本地测试(用于可见计时):

z9ju0rcb

z9ju0rcb2#

希望这是你想要的:

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <atomic>
#include <condition_variable>

namespace asio = boost::asio;
using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;

// Struct representing an event that can be raised and waited upon
struct oneshot_event {
    asio::io_context& io_; // Reference to the IO context for async operations
    std::atomic_bool raised_ = false; // Flag indicating whether the event has been raised
    std::vector<std::shared_ptr<asio::steady_timer>> timers_; // Timers associated with the event
    std::mutex mutex_; // Mutex for thread safety
    std::condition_variable cv_; // Condition variable for synchronization
    
    // Constructor that takes an IO context reference
    oneshot_event(asio::io_context& io) : io_(io) {}

    // Raise the event, allowing waiting coroutines to resume
    void raise() {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!raised_) {
            raised_ = true;
            for (auto& timer : timers_)
                timer->cancel();
            cv_.notify_all();
        }
    }
    
    // Wait for the event to be raised
    awaitable<void> wait() {
            std::unique_lock<std::mutex> lock(mutex_);
            if (!raised_) {
                // Create a new shared_ptr to a timer and add it to the vector
                auto timer = std::make_shared<asio::steady_timer>(io_, std::chrono::seconds(0));
                timers_.push_back(timer);
        
                // Suspend the coroutine until the timer expires or is canceled
                co_await timer->async_wait(use_awaitable);
            } else {
                // If the event is already raised, immediately resume the coroutine
                co_return;
            }
        }
};

// Coroutine that waits for an event and resumes when the event is raised
awaitable<void> myCoroutine(oneshot_event& event) {
    std::cout << "Coroutine waiting...\n";
    co_await event.wait();
    std::cout << "Coroutine resumed!\n";
}

int main() {
    asio::io_context io;

    oneshot_event event(io);
    auto coroutine = myCoroutine(event);

    co_spawn(io, std::move(coroutine), detached);

    std::cout << "Sleeping for 3 seconds...\n";
    std::this_thread::sleep_for(std::chrono::seconds(3));

    event.raise();  // Resume the coroutine

    io.run();
    return 0;
}

输出:

Sleeping for 3 seconds...
Coroutine waiting...
Coroutine resumed!

相关问题