尝试广播时boost::beast异步WebSocket服务器的互斥锁问题

yzckvree  于 2023-03-18  发布在  其他
关注(0)|答案(1)|浏览(239)

我尝试使用Websockets的C++ Boost实现构建一个WebSocket服务器,并使用以下代码作为我项目的模板:https://www.boost.org/doc/libs/1_80_0/libs/beast/example/websocket/server/async/websocket_server_async.cpp
模板将服务器接收到的任何消息回显给客户端。我修改了代码,使其向连接到服务器的每个客户端发送测试消息。为了实现这一点,我实现了一个会话列表,并向侦听器和会话本身添加了指向该列表的指针。您将在下面找到修改。
对于广播本身,我基本上重用了示例的echo代码和会话列表,执行的代码如下:

void send_message(std::string message) {
    buffer_.consume(buffer_.size());
    auto buffer_data = buffer_.prepare(message.size());
    std::copy(message.begin(), message.end(), boost::asio::buffer_cast<char*>(buffer_data));
    buffer_.commit(message.size());

    ws_.async_write(
        buffer_.data(),
        beast::bind_front_handler(
            &session::on_write,
            shared_from_this()));
}

void broadcast_message(std::string message) {
    std::cout << "sessions_ size: " << sessions_->size() << std::endl;

    for (const auto& session_ptr : *sessions_) {
        session_ptr->send_message(message);
    }
}

void on_read(beast::error_code ec, std::size_t bytes_transferred) {
    boost::ignore_unused(bytes_transferred);

    if(ec == websocket::error::closed) return;

    if(ec) return fail(ec, "read");

    for (const auto& session_ptr : *sessions_) {
        session_ptr->send_message("test");
    }
}

这在一个客户端连接到服务器的情况下都能正常工作,而在多个客户端执行广播的情况下,所有消息都会如期到达,但只要on_read()代码块在soft_mutex.hpp中完成此Assert,消息就会立即关闭:

template<class T>
bool
try_lock(T const*)
{
    // If this assert goes off it means you are attempting to
    // simultaneously initiate more than one of same asynchronous
    // operation, which is not allowed. For example, you must wait
    // for an async_read to complete before performing another
    // async_read.
    //
    BOOST_ASSERT(id_ != T::id);
    if(id_ != 0)
        return false;
    id_ = T::id;
    return true;
}

我不明白为什么会这样。据我所知,我所有的async_write操作都成功完成了,而且我用一个线程启动了服务器。任何帮助都将不胜感激!
这是对main方法的修改,我对main方法进行了修改,以传递会话列表:

std::list<std::shared_ptr<session>> sessions;
std::make_shared<listener>(ioc, tcp::endpoint{address, port}, &sessions)->run();

会话将按如下方式放入列表中:

auto new_session = std::make_shared<session>(std::move(socket), sessions_);
sessions_->push_back(new_session);
new_session->run();

这是错误消息:

/usr/include/boost/beast/websocket/detail/soft_mutex.hpp:89: bool boost::beast::websocket::detail::soft_mutex::try_lock(const T*) [with T = boost::beast::websocket::stream<boost::beast::basic_stream<boost::asio::ip::tcp, boost::asio::execution::any_executor<boost::asio::execution::context_as_t<boost::asio::execution_context&>, boost::asio::execution::detail::blocking::never_t<0>, boost::asio::execution::prefer_only<boost::asio::execution::detail::blocking::possibly_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::tracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::untracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::fork_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::continuation_t<0> > >, boost::beast::unlimited_rate_policy> >::read_some_op<boost::beast::websocket::stream<boost::beast::basic_stream<boost::asio::ip::tcp, boost::asio::execution::any_executor<boost::asio::execution::context_as_t<boost::asio::execution_context&>, boost::asio::execution::detail::blocking::never_t<0>, boost::asio::execution::prefer_only<boost::asio::execution::detail::blocking::possibly_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::tracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::untracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::fork_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::continuation_t<0> > >, boost::beast::unlimited_rate_policy> >::read_op<boost::beast::detail::bind_front_wrapper<void (session::*)(boost::system::error_code, long unsigned int), std::shared_ptr<session> >, boost::beast::basic_flat_buffer<std::allocator<char> > >, boost::asio::mutable_buffer>]: Assertion `id_ != T::id' failed.
jm81lzqq

jm81lzqq1#

on_accept启动读取链:

当您添加send_message时,它还会为任何其他会话启动一个read_chain,这些会话都已经运行了自己的会话:

修理?
请注意,您也没有注意线股。send_message应该将写操作 * 发布 * 到会话线股。
保持一个读写链的方法是有一个出站队列,我在这个网站上有很多例子,我通常把这个队列称为发件箱(_outboxm_outboxoutbox_),我最近也申请了一个WebSocket应用程序:Boost Beast异步WebSocket服务器如何与会话连接?
请注意,在该示例中,我还展示了一个封装更好的会话列表:它属于侦听器,并且应该包含弱指针以避免生存期/关闭死锁问题。
当然,您的应用程序看起来更像是一个聊天服务器--具有全方位的消息广播功能--所以不妨看看https://www.boost.org/doc/libs/1_81_0/libs/beast/example/websocket/server/chat-multi/是如何做到这一点的。

  • 它们将出站队列称为queue_
  • 他们将会话列表放入shared_state

相关问题