为什么boost::asio::io_context::run()在boost::beast::WebSocket::stream::close()之后仍然阻塞?

ddrv8njm  于 2023-01-21  发布在  其他
关注(0)|答案(1)|浏览(387)

我有一个线程池,同时使用相同的io_context来运行一个WebSocket流,我这样做是因为首先,我实际上有两个websocket流(我抽象了这个,因为通过测试它似乎不是问题),并且因为我想运行除了websocket之外的其他io操作,即async_readasync_write
每个WebSocket流都使用自己的串,并使用额外的锁定来确保async_read(分别为async_write)在另一个到达处理程序之前不会执行。
所以基本上

io_context context;
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);
...
wss.async_read(&loop_read_handler);
...
for(auto& th:pool)
    th=std::thread([&]{
        try{
            start_read_loop();//give work to do to each thread
            context.run();
        }catch(...){}
        wss.close(...);//closing the websocket stream, expected to cancel all threads
        context.stop();//with or without it, no change
    });
for(auto& th:pool)
    th.join();//hangs here since the other threads did not return from run()

当我想让程序停止时,我close(boost::beast::websocket::close_code::normal,ec)流,这有效地取消了当前线程中的io操作(接收到错误代码为boost::beast::websocket::error::closed的空消息),但在其他线程中没有:而不是被取消,他们挂起。
深入到代码中,我排除了自己的死锁假设,发现context.run()没有注意到WebSocket流被关闭,而是继续等待传入消息。
当然,当池被限制为单个线程时,问题就消失了。
从外部或内部调用close(...)一个io操作不改变这问题.调用context.stop()对这问题没有任何影响,无论它是被调用外部或内部.
问题是什么?我应该如何让上下文在一个优雅的WebSocket关闭时停止运行?
==============================使用解决方案编辑
多亏了上面的答案,我成功地修改了代码。我没有在每个线程中启动读循环,而是在池初始化之后做一次,但是添加了auto work=make_work_guard(context);work.reset()

io_context context;
auto work=make_work_guard(context);//<<<<<<<<<<<<<<
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);//I keep it because I will add other streams
...
for(auto& th:pool)
    th=std::thread([&]{
        try{ context.run(); }catch(...){} //<<<<<<<<<<<<<<<<<<<
        close_wss_streams_once_each(...);//cancels all threads
    });
start_async_read_loop();//<<<<<<<<<<<<<<
work.reset();//<<<<<<<<<<<<<<<<<
for(auto& th:pool)
    th.join();

显然,我不应该在每个线程中发布IO操作,我决定这样做是为了给予所有线程都有工作要做。相反,使用work guqrd可以防止线程过早返回。

vd2z7a6w

vd2z7a6w1#

相同io_context来运行网络套接字流
流不是一个进程(甚至不是一个操作)。你不能"运行一个[websocket]流"。你基本上只会运行一个执行排队处理程序的事件循环,除了同步代码。
其他线程:他们没有被取消,而是被挂在
显示的代码回避了相反的问题:为什么所有的线程all不立即返回(因为在启动线程之前不存在任何工作)?很明显,您的实际代码有很大的不同,因此不会发生这种情况。
也许你甚至有一个明确的work_guard。如果是这样,这当然解释了为什么事情没有关闭。
当然,当池被限制为单个线程时,问题就消失了。
我不确定这对我是否有帮助。从逻辑上讲,线程越少,死锁的可能性就越大。不管怎样,这不是你的问题。

想象的问题代码,但有效

下面是我的设想,只是添加了工作保护,使线程在您发布第一个async_read之前不会全部完成:

net::io_context ioc;
std::vector<std::thread> pool(std::thread::hardware_concurrency());

auto work = make_work_guard(ioc);

for (auto& th : pool)
    th = std::thread{[&ioc] { try { ioc.run(); } catch (...) { } }};

现在,让我们构造、连接、ssl握手和ws握手一个websocket客户端(为简单起见,同步进行):

sctx ctx(sctx::tlsv13_client);
Ws wss(make_strand(ioc), ctx);

auto& s = beast::get_lowest_layer(wss);
s.connect({{}, 8989});
wss.next_layer().handshake(Ws::next_layer_type::handshake_type::client);
wss.handshake("localhost", "/");

现在让我们添加loop_read_handler,显然这是某种(成员)函数,但是我们这里没有类,所以让我们添加一个闭包:

std::function<void(error_code, size_t)> loop_read_handler;

beast::flat_buffer buf;
loop_read_handler = [&](error_code ec, size_t n) {
    std::cout << "loop_read_handler " << ec.message() << ", " << n << std::endl;
    if (n)
        std::cout << "Received " << quoted(beast::buffers_to_string(buf.cdata())) << std::endl;

    if (!ec) {
        buf.consume(n);
        wss.async_read(buf, loop_read_handler);
    }
};

当然,我们必须开始第一次阅读:

wss.async_read(buf, loop_read_handler); // not on strand, because nothing is yet on the pool

现在,我可以设置一个计时器,但实际上您希望在应用程序接收到终止信号时正常关闭,因此让我们在演示中这样做:

net::signal_set ss(ioc, SIGINT, SIGTERM); // SIGINT e.g. from Ctrl-C in a terminal
ss.async_wait([&](error_code ec, int sig) {
    std::cout << "signal " << ::strsignal(sig) << " (" << ec.message() << ")" << std::endl;
    if (!ec) {
        // on strand:
        post(wss.get_executor(), [&wss] { wss.close(websocket::normal); });
    }
});

就这样!现在,我们要做的就是等待。这样,我们就可以把脚手架拆了:

// from this point we're okay returning, as soon as the read loop stops
work.reset();
std::cout << "waiting for graceful shutdown" << std::endl;

for (auto& th : pool)
    th.join();

std::cout << "graceful shutdown complete" << std::endl;

完整列表

    • 生活在科里鲁**
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <iomanip>
#include <iostream>
namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;

using boost::system::error_code;
using net::ip::tcp;
using sctx = ssl::context;
using Ws   = websocket::stream<ssl::stream<tcp::socket>>;

int main() {
    net::io_context ioc;
    std::vector<std::thread> pool(std::thread::hardware_concurrency());

    auto work = make_work_guard(ioc);

    for (auto& th : pool)
        th = std::thread{[&ioc] { try { ioc.run(); } catch (...) { } }};

    sctx ctx(sctx::tlsv13_client);
    Ws wss(make_strand(ioc), ctx);

    auto& s = beast::get_lowest_layer(wss);
    s.connect({{}, 8989});
    wss.next_layer().handshake(Ws::next_layer_type::handshake_type::client);
    wss.handshake("localhost", "/");

    std::function<void(error_code, size_t)> loop_read_handler;

    beast::flat_buffer buf;
    loop_read_handler = [&](error_code ec, size_t n) {
        std::cout << "loop_read_handler " << ec.message() << ", " << n << std::endl;
        if (n)
            std::cout << "Received " << quoted(beast::buffers_to_string(buf.cdata())) << std::endl;

        if (!ec) {
            buf.consume(n);
            wss.async_read(buf, loop_read_handler);
        }
    };
    wss.async_read(buf, loop_read_handler); // not on strand, because nothing is yet on the pool

    net::signal_set ss(ioc, SIGINT, SIGTERM); // SIGINT e.g. from Ctrl-C in a terminal
    ss.async_wait([&](error_code ec, int sig) {
        std::cout << "signal " << ::strsignal(sig) << " (" << ec.message() << ")" << std::endl;
        if (!ec) {
            // on strand:
            post(wss.get_executor(), [&wss] { wss.close(websocket::normal); });
        }
    });

    // from this point we're okay returning, as soon as the read loop stops
    work.reset();
    std::cout << "waiting for graceful shutdown" << std::endl;

    for (auto& th : pool)
        th.join();

    std::cout << "graceful shutdown complete" << std::endl;
}

在一个简单的演示WSS服务器上运行它:

websocketd -port 8989 -ssl --sslcert server.pem --sslkey server.pem ping www.google.com

并且在终端中用Ctrl-C终止,或者向其发送SIGTERM信号:

奖金

整个线程池可以用asio::thread_pool替换(更正确!):

int main() {
    net::thread_pool ioc;

    // ...
    Ws wss(make_strand(ioc), ctx);

    // ...

    // from this point we're okay returning, as soon as the read loop stops
    std::cout << "waiting for graceful shutdown" << std::endl;
    ioc.join();
    std::cout << "graceful shutdown complete" << std::endl;
}

这样,您就根本不必干预工作守卫(或担心异常的正确处理)。

相关问题