websocket 握手时Boost Beast“版本错误”,写入时“操作已取消”

disbfnqx  于 2023-02-23  发布在  其他
关注(0)|答案(1)|浏览(127)

我构造了一个套接字客户端和服务器,并使用它进行本地通信
构建是好的,但当运行在根特权(绑定()需要根),套接字得到了错误的版本,而握手。
为简单起见,错误代码如下:
握手时()
在写入时()
Beast版本:1.70.0
代码:
ClientService.cc

// Client Connect to Server
void ClientService::Connect(const std::string &host, unsigned int port) {
    const auto id = (unsigned int)(mpSLAM->GetMap()->mnId);

    mThread = new std::thread([this, host, port, id] {
        auto const text = "Hello, world!";
        info("client {} connect to host {} port {}", id, host, port);

        // The io_context is required for all I/O
        boost::asio::io_context ioc;

        work_guard_type workGuard(ioc.get_executor());

        const std::string  specify_local_address = "0.0.0.0";
        unsigned int  specify_local_portnumber = 20000;

        info("In connection: client bind to local endpoint host {} port {}", specify_local_address, specify_local_portnumber);

        // Launch the asynchronous operation, which would call WebSocket.h
        auto session = std::make_shared<WS::Client::session>(ioc, specify_local_address.c_str(), reinterpret_cast<unsigned int *>(specify_local_portnumber),
                                                             std::bind(&ClientService::OnRequest, this, std::placeholders::_1));

        this->service = session;
        session->run(host.c_str(), reinterpret_cast<unsigned int *>(port), text);

        // Run the I/O service. The call will return when
        // the socket is closed.
        ioc.run();

    });
}

void ClientService::SendRequest(const Request &req) {
    // serialize and send request
    std::string msg = ORB_SLAM2::toString(req);

    this->service->send(make_shared<std::string>(msg));
}

客户端的WebSocket. h

namespace Client {
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session> {
//    we do not need a resolver since itself initialize a connection
//    tcp::resolver resolver_;
//    websocket::stream <tcp::socket> ws_;
    websocket::stream <beast::tcp_stream> ws_;
//    beast::tcp_stream ws_;
    beast::flat_buffer buffer_;
    std::vector<std::shared_ptr<const std::string>> queue;
    std::string host_;
    std::uint8_t port_;
    std::function<void(const std::string&)> on_message;
    std::string localhost_;
    std::uint8_t localport_;

    //the constructor
public:
    // Resolver and socket require an io_context
    explicit
        session(net::io_context &ioc, char const *localhost, unsigned int *localport, std::function<void(const std::string&)> on_message)
    : ws_(net::make_strand(ioc)), on_message(std::move(on_message))
    {
        localhost_ = localhost;

        std::stringstream str_port_value;
        str_port_value << localport;
        str_port_value >> localport_;

        beast::error_code err;
        
        //Here I've bind the local endpoint
        beast::get_lowest_layer(ws_).socket().open(boost::asio::ip::tcp::v4(), err);
        beast::get_lowest_layer(ws_).socket().bind(tcp::endpoint(boost::asio::ip::make_address_v4(localhost_), localport_));

    }

    // Start the asynchronous operation
    void
    run(
            char const *host,
            unsigned int *port,
            __attribute__((unused)) char const *text) {
        // Save these for later
        host_ = host;
        std::stringstream str_port_value;
        str_port_value << port;
        str_port_value >> port_;

        // dropped the resolver
//        resolver_.async_resolve(
//                host,
//                port,
//                beast::bind_front_handler(
//                        &session::on_resolve,
//                        shared_from_this()));

        //construct a tcp::endpoint using ip::address_v4 and port number
        tcp::endpoint ep(boost::asio::ip::address::from_string(host_.c_str()), port_);

        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
       
        //here just connect to ep without resolver
        beast::get_lowest_layer(ws_).socket().async_connect(
                ep,
                beast::bind_front_handler(
                        &session::on_connect,
                        shared_from_this()));
    }

    void
//    on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) {
        on_connect(beast::error_code ec)
        {

        //  get the ep parameter from run() as ep_
        tcp::endpoint ep_(boost::asio::ip::address::from_string(host_.c_str()), port_);

        if (ec)
            return fail(ec, "connect");

        // Turn off the timeout on the tcp_stream, because
        // the websocket stream has its own timeout system.
        beast::get_lowest_layer(ws_).expires_never();

        // Set suggested timeout settings for the websocket
        ws_.set_option(
                websocket::stream_base::timeout::suggested(
                        beast::role_type::client));

        // output on screen said making a handshake with server
        std::cout << "Making a handshake with server" << std::endl;
        
        //where possibly go wrong
        // Set a decorator to change the User-Agent of the handshake
        ws_.set_option(websocket::stream_base::decorator(
                [](websocket::request_type &req) {
                    req.set(http::field::user_agent,
                            std::string(BOOST_BEAST_VERSION_STRING) +
                            " websocket-client-async");
                }));

        // update the host string. This will provide the value of the
        // host HTTP header during the websocket handshake
        // the guide references: https://tools.ietf.org/html/rfc7230#section-5.4
        host_ += ':' + std::to_string(ep_.port());

        // Perform the websocket handshake
        ws_.async_handshake(host_, "/",
                            beast::bind_front_handler(
                                    &session::on_handshake,
                                    shared_from_this()));
    }

    void
    on_handshake(beast::error_code ec) {
        //here comes the error code
        if (ec)
            return fail(ec, "handshake");

        buffer_.consume(buffer_.size());
        net::post(ws_.get_executor(), beast::bind_front_handler(&session::on_read, shared_from_this(), ec, 5));
        std::cout << "Handshake successful." << std::endl;
    }

    void
    on_write(
            beast::error_code ec,
            std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);
       
        //another error code
        if (ec)
            return fail(ec, "write");

        queue.erase(queue.begin());

        // send the message if any
        if (!queue.empty()) {
            ws_.async_write(net::buffer(*queue.front()),
                           beast::bind_front_handler(&session::on_write, shared_from_this()));
        }
    }

服务器的WebSocket. h

public:
    // Take ownership of the socket
    explicit
    session(tcp::socket &&socket, std::shared_ptr<shared_state> state, std::function<void(std::string)> on_message)
            : ws_(std::move(socket)), state(std::move(state)), on_message(std::move(on_message)) {
    }

    ~session() {
        std::cout << "~session()" << std::endl;
        state->leave(this);
    }

    // Start the asynchronous operation
    void
    run() {
        // Set suggested timeout settings for the websocket
        ws_.set_option(
                websocket::stream_base::timeout::suggested(
                        beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_.set_option(websocket::stream_base::decorator(
                [](websocket::response_type &res) {
                    res.set(http::field::server,
                            std::string(BOOST_BEAST_VERSION_STRING) +
                            " websocket-server-async");
                }));

        // Accept the websocket handshake
        ws_.async_accept(
                beast::bind_front_handler(
                        &session::on_accept,
                        shared_from_this()));
    }

    void
    on_accept(beast::error_code ec) {
        if (ec)
            return fail(ec, "accept");

        state->join(this);
        // Read a message
        do_read();
    }

运行日志:
[11:07:16][3518][I][注册远程:70]正在注册远程客户端
[11:07:16][3518][I][注册号:172]客户端绑定到本地端点主机0.0.0.0端口20001
[11:07:16][3518][I][注册号:173]客户端注册到主机0.0.0.0端口10088
小行星12330
[11:07:16][3518][I][注册远程:79]已注册客户端,其ID为:1和端口:2330
[11:07:16][3518][I][远程寄存器:85]正在连接到数据通道
[11:07:16][3518][I][远程寄存器:89]已连接到数据通道
[11:07:16][3533][I][operator():39]客户端1连接到主机0.0.0.0端口2330
[11:07:16][3533][I][运算符():54]在连接中:客户端绑定到本地端点主机0.0.0.0端口20000
正在与服务器握手
握手:错误版本
GTK-消息:十一点零七分十六点二九七分加载模块“canberra-gtk-module”失败
write:操作已取消
在我删除解析器并修改绑定本地端点的代码之前,它工作得很好。
但我没有修改握手的部分。
并且不能看到在调试模式下发生了什么,因为它直接进入了写操作。
密码有错吗?
还是我用了
websocket::stream <beast::tcp_stream> ws_;
尽管它在原始代码中使用
如有任何帮助和指导,我将不胜感激,谢谢!

5kgi1eie

5kgi1eie1#

至少在端口号上有很多bug,你将无符号数字重新解释为...指针,然后通过写入字符串流和返回来转换,但是...转换为uint8_t,难怪**你"需要root"才能执行bind:前1024个端口是特权端口。
除此之外,你还在做一些奇怪的事情

  • 创建完全冗余的work_guard,因为您稍后将使用数据块io_context::run()来完成所有工作
  • 忽略/ Gulp error_code
  • bind-客户端套接字上的ing;这是非常罕见的--这可能是你想要的,尽管我不特别知道为什么
  • consume()-ing从未使用过的缓冲区
  • 用单个session示例设置this->service;如果线程所做的只是运行io上下文,那么为什么首先要有线程呢?
  • 在客户机中创建一个串,而您知道根据定义,io_context属于单个线程
  • 螺纹泄漏,但未分离
  • 您有一个出站消息队列,但是没有任何东西曾经调用on_write,因此没有任何东西将开始写入(queue不在on_write之外使用)
  • 你是...张贴on_read手动,与一个虚假的大小5

我在same Beast example的基础上添加了一个listener,使其自包含以重现您的消息:

    • 第一个e第一个f第一个x
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/lexical_cast.hpp>
#include <functional>

#include <fmt/ranges.h>
#define info(...) fmt::print(__VA_ARGS__)

namespace WS {
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    namespace http      = beast::http;
    using boost::system::error_code;
    using net::ip::tcp;

    static inline void fail(error_code ec, std::string_view where) {
        info("{}: {}\n", where, ec.message());
    }

    using Message        = std::string;
    using MessageHandler = std::function<void(Message const&)>;

    namespace Client {
        // Sends a WebSocket message and prints the response
        class session : public std::enable_shared_from_this<session> {
            std::vector<std::shared_ptr<Message const>> queue;
            websocket::stream<beast::tcp_stream>        ws_;

            beast::flat_buffer buffer_;
            std::string        host_, localhost_;
            std::uint16_t      port_, localport_;
            MessageHandler     on_message;

          public:
            // Resolver and socket require an io_context
            explicit session(net::io_context& ioc, std::string localhost, uint16_t localport,
                             MessageHandler on_message)
                : ws_(net::make_strand(ioc), tcp::v4())
                , localhost_(std::move(localhost))
                , localport_(localport)
                , on_message(std::move(on_message)) //
            {
                auto& s = get_lowest_layer(ws_).socket();
                s.set_option(net::socket_base::reuse_address(true));
                s.bind(tcp::endpoint(net::ip::make_address_v4(localhost_), localport_));
            }

            // Start the asynchronous operation
            void run(std::string host, uint16_t port, std::string text) {
                boost::ignore_unused(text);
                // Save these for later
                host_ = host;
                port_ = port;

                beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

                // here just connect to ep without resolver
                tcp::endpoint ep(net::ip::address::from_string(host_), port_);
                beast::get_lowest_layer(ws_).socket().async_connect(
                    ep, beast::bind_front_handler(&session::on_connect, shared_from_this()));
            }

            void on_connect(beast::error_code ec) {
                auto& ll = beast::get_lowest_layer(ws_);

                //  get the ep parameter from run() as ep_
                tcp::endpoint ep_ = !ec ? ll.socket().local_endpoint(ec) : tcp::endpoint{};

                if (ec)
                    return fail(ec, "connect");

                // Turn off the timeout on the tcp_stream, because
                // the websocket stream has its own timeout system.
                ll.expires_never();

                // Set suggested timeout settings for the websocket
                ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

                // output on screen said making a handshake with server
                info("Making a handshake with server\n");

                // Set a decorator to change the User-Agent of the handshake
                ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                    req.set(http::field::user_agent,
                            std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
                }));

                // update the host string. This will provide the value of the
                // host HTTP header during the websocket handshake the guide
                // references: https://tools.ietf.org/html/rfc7230#section-5.4
                host_ += ':' + std::to_string(ep_.port());

                // Perform the websocket handshake
                ws_.async_handshake(host_, "/",
                                    beast::bind_front_handler(&session::on_handshake, shared_from_this()));
            }

            void on_handshake(beast::error_code ec) {
                if (ec)
                    return fail(ec, "handshake");

                // buffer_.consume(buffer_.size());
                //  net::post(ws_.get_executor(),
                //  beast::bind_front_handler(&session::on_read, shared_from_this(), ec, 5));
                info("Handshake successful.\n");
            }

            void on_write(beast::error_code ec, std::size_t bytes_transferred) {
                boost::ignore_unused(bytes_transferred);

                // another error code
                if (ec)
                    return fail(ec, "write");

                queue.erase(queue.begin());

                // send the message if any
                if (!queue.empty()) {
                    ws_.async_write(net::buffer(*queue.front()),
                                    beast::bind_front_handler(&session::on_write, shared_from_this()));
                }
            }
        };

        struct ClientService {
            struct SLAM {
                auto GetMap() { return &_map; }
                struct {
                    unsigned mnId = 42;
                } _map;
            };
            std::unique_ptr<SLAM> mpSLAM{new SLAM};
            std::thread*          mThread = nullptr;

            void Connect(std::string const& host, uint16_t port);

          private:
            net::io_context ioc;
            using work_guard_type = net::executor_work_guard<net::io_context::executor_type>;
            std::shared_ptr<Client::session> service;

            void OnRequest(Message const& msg) { info("OnRequest('{}')\n", msg); }
        };

        // Client Connect to Server
        void ClientService::Connect(std::string const& host, uint16_t port) {
            auto const id = static_cast<unsigned>( // friends don't let friends use C-style casts
                mpSLAM->GetMap()->mnId);

            mThread =
                new std::thread([this, host, port, id] {
                    auto const text = "Hello, world!";
                    info("client {} connect to host {} port {}\n", id, host, port);

                    // The io_context is required for all I/O
                    net::io_context ioc;

                    work_guard_type workGuard(ioc.get_executor());

                    std::string specify_local_address    = "0.0.0.0";
                    uint16_t    specify_local_portnumber = 20'000;

                    info("In connection: client bind to local endpoint host {} port {}\n",
                         specify_local_address, specify_local_portnumber);

                    // Launch the asynchronous operation, which would call WebSocket.h
                    auto session = std::make_shared<WS::Client::session>(
                        ioc, specify_local_address, specify_local_portnumber,
                        std::bind(&ClientService::OnRequest, this, std::placeholders::_1));

                    this->service = session;
                    session->run(host, port, text);

                    // Run the I/O service
                    ioc.run();
                });
        }
    } // namespace Client

    namespace Server {
        class session;
        struct shared_state {
            void join(session const*) {}
            void leave(session const*) {}
        };

        class session : public std::enable_shared_from_this<session> {
            websocket::stream<tcp::socket> ws_;
            std::shared_ptr<shared_state>  state;
            MessageHandler                 on_message;

          public:
            session(tcp::socket&& socket, std::shared_ptr<shared_state> state,
                    MessageHandler on_message)
                : ws_(std::move(socket))
                , state(std::move(state))
                , on_message(std::move(on_message)) {}

            ~session() {
                info("!session()\n");
                state->leave(this);
            }

            // Start the asynchronous operation
            void run() {
                ws_.set_option(
                    websocket::stream_base::timeout::suggested(beast::role_type::server));

                ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
                    res.set(http::field::server,
                            std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
                }));

                // Accept the websocket handshake
                ws_.async_accept(
                    beast::bind_front_handler(&session::on_accept, shared_from_this()));
            }

            void on_accept(beast::error_code ec) {
                if (ec)
                    return fail(ec, "accept");

                state->join(this);
                // Read a message
                // do_read();
            }
        };

        // after the example from
        // https://www.boost.org/doc/libs/master/libs/beast/example/websocket/server/async/websocket_server_async.cpp
        class listener : public std::enable_shared_from_this<listener> {
            net::io_context& ioc_;
            tcp::acceptor    acceptor_;

          public:
            listener(net::io_context& ioc, tcp::endpoint endpoint) try : ioc_(ioc), acceptor_(ioc) {
                acceptor_.open(endpoint.protocol());
                acceptor_.set_option(net::socket_base::reuse_address(true));
                acceptor_.bind(endpoint);
                acceptor_.listen(net::socket_base::max_listen_connections);
            } catch (boost::system::system_error const& se) {
                fail(se.code(), boost::lexical_cast<std::string>(se.code().location()));
            }

            // Start accepting incoming connections
            void run() { do_accept(); }

          private:
            std::shared_ptr<shared_state> state_ = std::make_shared<shared_state>();

            void do_accept() {
                acceptor_.async_accept(
                    make_strand(ioc_),
                    beast::bind_front_handler(&listener::on_accept, shared_from_this()));
            }

            void on_accept(beast::error_code ec, tcp::socket socket) {
                if (ec) {
                    fail(ec, "accept");
                } else {
                    auto handler = [ep = socket.remote_endpoint()](Message const& msg) {
                        info("From {}: {}\n", boost::lexical_cast<std::string>(ep), msg);
                    };

                    // Create the session and run it
                    auto conn = std::make_shared<session>(std::move(socket), state_, handler);
                    conn->run();
                }

                // Accept another connection
                do_accept();
            }
        };
    } // namespace Server
} // namespace WS

int main() {
    using namespace WS;
    net::io_context ioc;

    auto ls = std::make_shared<Server::listener>(ioc, tcp::endpoint{{}, 20'001});
    ls->run();

    Client::ClientService cs;
    cs.Connect("127.0.0.1", 20'000u);

    ioc.run();
}

本地输出:

这里明显的错误是我连接到了客户端本身的本地绑定端口20'000!修复到20'001(或任何运行服务器的端口):

奖金

解决更多上述问题,并添加实际功能:

    • 一个月一次**
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/lexical_cast.hpp>
#include <deque>

#include <fmt/ranges.h>
#define info(...)                    \
    do {                             \
        fmt::print(__VA_ARGS__);     \
        std::fflush(stdout);         \
    } while (0)

using namespace std::chrono_literals;

namespace WS {
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    namespace http      = beast::http;
    using boost::system::error_code;
    using beast::bind_front_handler;
    using net::ip::tcp;

    static inline void fail(error_code ec, std::string_view where) {
        info("{}: {}\n", where, ec.message());
    }

    using Message        = std::string;
    using MessageHandler = std::function<void(Message const&)>;

    namespace Client {
        // Sends a WebSocket message and prints the response
        class session : public std::enable_shared_from_this<session> {
            websocket::stream<beast::tcp_stream> ws_;

            bool                handshake_completed_ = false;
            std::deque<Message> queue_;
            beast::flat_buffer  buffer_;
            std::string         ip_address_, local_addr_;
            std::uint16_t       port_, local_port_;
            MessageHandler      on_message;

          public:
            // Resolver and socket require an io_context
            explicit session(net::any_io_executor ex, std::string localhost, uint16_t localport,
                             MessageHandler on_message)
                : ws_{ex, tcp::v4()} // assumed single-threaded execution context
                , local_addr_{std::move(localhost)}
                , local_port_{localport}
                , on_message{std::move(on_message)} //
            {
                auto& s = get_lowest_layer(ws_).socket();
                s.set_option(net::socket_base::reuse_address(true));
                s.bind({net::ip::make_address_v4(local_addr_), local_port_});
            }

            // Start the asynchronous operation
            void run(std::string ip_address, uint16_t port) {
                // Save these for later
                ip_address_ = ip_address;
                port_       = port;

                beast::get_lowest_layer(ws_).expires_after(30s);

                // host assumed to be resolved address
                get_lowest_layer(ws_).socket().async_connect(
                    {net::ip::address::from_string(ip_address_), port_},
                    bind_front_handler(&session::on_connect, shared_from_this()));
            }

            void stop() {
                post(ws_.get_executor(), [self = shared_from_this()] {
                    info("Closing down websocket\n");
                    get_lowest_layer(self->ws_).cancel();
                    // self->ws_.close("stop");
                });
            }

            void enqueue(std::string msg) {
                post(ws_.get_executor(), [m = std::move(msg), self = shared_from_this()] {
                    self->do_enqueue(std::move(m));
                });
            }

          private:
            void on_connect(beast::error_code ec) {
                if (ec)
                    return fail(ec, __PRETTY_FUNCTION__);

                auto& ll   = get_lowest_layer(ws_);
                // auto local_port = ll.socket().local_endpoint().port();

                //  Turn off the timeout on the tcp_stream, because the
                //  websocket stream has its own timeout system.
                ll.expires_never();

                // Set suggested timeout settings for the websocket
                ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

                // Set a decorator to change the User-Agent of the handshake
                ws_.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                    req.set(http::field::user_agent,
                            std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async");
                }));

                // update the host string. This will provide the value of the
                // host HTTP header during the websocket handshake the guide
                // references: https://tools.ietf.org/html/rfc7230#section-5.4
                ip_address_ += ':' + std::to_string(port_); // TODO REVIEW why mutate?

                // output on screen said making a handshake with server
                info("Making a handshake with {}\n", ip_address_);

                // Perform the websocket handshake
                ws_.async_handshake(ip_address_, "/",
                                    bind_front_handler(&session::on_handshake, shared_from_this()));
            }

            void on_handshake(beast::error_code ec) {
                if (ec)
                    return fail(ec, __PRETTY_FUNCTION__);

                info("Handshake successful.\n");
                ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));

                handshake_completed_ = true;
                do_write(); // if already queued
            }

            void on_read(error_code ec, size_t n) {
                if (ec)
                    return fail(ec, __PRETTY_FUNCTION__);

                if (on_message)
                    on_message(beast::buffers_to_string(buffer_.cdata()).substr(0, n));

                buffer_.consume(n);
                ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));
            }

            void do_enqueue(Message msg) { // assumed on strand!
                queue_.push_back(std::move(msg));
                if (queue_.size() == 1)
                    do_write();
            }

            void do_write() {
                if (!handshake_completed_ || queue_.empty())
                    return;

                info("{}: Initiating write ({} pending) '{}'\n", __PRETTY_FUNCTION__, queue_.size(),
                     queue_.front());
                ws_.async_write(net::buffer(queue_.front()), // FIFO
                                bind_front_handler(&session::on_write, shared_from_this()));
            }

            void on_write(beast::error_code ec, size_t) {
                if (ec)
                    return fail(ec, __PRETTY_FUNCTION__);

                queue_.pop_front();
                do_write(); // continue until queue empty
            }
        };

        struct ClientService {
            ~ClientService() {
                if (connection_)
                    connection_->stop();

                // ioc.stop(); // optionally?
                ioc.join();
            }

            void Connect(std::string const& host, uint16_t port) {
                info("client {} connect to host {} port {}\n", mpSLAM->GetMap()->mnId, host, port);

                connection_ = std::make_shared<WS::Client::session>(
                    ioc.get_executor(), //
                    "0.0.0.0", 20'000,
                    std::bind(&ClientService::OnRequest, this, std::placeholders::_1));

                connection_->run(host, port);
            }

            void Send(Message msg) {
                assert(connection_);
                connection_->enqueue(std::move(msg));
            }

          private:
            net::thread_pool ioc{1};
            std::shared_ptr<Client::session> connection_;

            void OnRequest(Message const& msg) {
                info("OnRequest('{}')\n", msg);
                assert(connection_);
            }

            struct SLAM {
                struct Map { unsigned mnId = 42; } _map;
                Map const* GetMap() const { return &_map; }
            };
            std::unique_ptr<SLAM> mpSLAM{new SLAM};
        };
    } // namespace Client

    namespace Server {
        class session;
        struct shared_state {
            void join(session const*) {}
            void leave(session const*) {}
        };

        class session : public std::enable_shared_from_this<session> {
            websocket::stream<tcp::socket> ws_;
            std::shared_ptr<shared_state>  state;
            MessageHandler                 on_message;

          public:
            session(tcp::socket&& socket, std::shared_ptr<shared_state> state,
                    MessageHandler on_message)
                : ws_(std::move(socket))
                , state(std::move(state))
                , on_message(std::move(on_message)) {}

            ~session() {
                info("~session()\n");
                state->leave(this);
            }

            // Start the asynchronous operation
            void run() {
                ws_.set_option(
                    websocket::stream_base::timeout::suggested(beast::role_type::server));

                ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
                    res.set(http::field::server,
                            std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
                }));

                // Accept the websocket handshake
                ws_.async_accept(bind_front_handler(&session::on_accept, shared_from_this()));
            }

          private:
            void on_accept(beast::error_code ec) {
                info("{}: {}\n", __PRETTY_FUNCTION__, ec.message());
                if (ec)
                    return fail(ec, __PRETTY_FUNCTION__);

                state->join(this);

                do_read();
            }

            beast::flat_buffer buffer_;
            void do_read() {
                ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));
            }

            void on_read(error_code ec,  size_t n) {
                info("{}: {}, {}\n", __PRETTY_FUNCTION__, ec.message(), n);
                if (ec)
                    return fail(ec, __PRETTY_FUNCTION__);

                if (on_message)
                    on_message(beast::buffers_to_string(buffer_.cdata()).substr(0, n));

                buffer_.consume(n);
                ws_.async_read(buffer_, bind_front_handler(&session::on_read, shared_from_this()));
            }
        };

        // after the example from
        // https://www.boost.org/doc/libs/master/libs/beast/example/websocket/server/async/websocket_server_async.cpp
        class listener : public std::enable_shared_from_this<listener> {
            net::io_context& ioc_;
            tcp::acceptor    acceptor_;

          public:
            listener(net::io_context& ioc, tcp::endpoint endpoint) try : ioc_(ioc), acceptor_(ioc) {
                acceptor_.open(endpoint.protocol());
                acceptor_.set_option(net::socket_base::reuse_address(true));
                acceptor_.bind(endpoint);
                acceptor_.listen(net::socket_base::max_listen_connections);
            } catch (boost::system::system_error const& se) {
                fail(se.code(), boost::lexical_cast<std::string>(se.code().location()));
            }

            // Start accepting incoming connections
            void run() { do_accept(); }

          private:
            std::shared_ptr<shared_state> state_ = std::make_shared<shared_state>();

            void do_accept() {
                acceptor_.async_accept(
                    make_strand(ioc_),
                    bind_front_handler(&listener::on_accept, shared_from_this()));
            }

            void on_accept(beast::error_code ec, tcp::socket socket) {
                info("{}: {} from {}\n", __PRETTY_FUNCTION__, ec.message(),
                     boost::lexical_cast<std::string>(socket.remote_endpoint()));

                if (ec) {
                    fail(ec, __PRETTY_FUNCTION__);
                } else {
                    auto handler = [ep = socket.remote_endpoint()](Message const& msg) {
                        info("From {}: '{}'\n", boost::lexical_cast<std::string>(ep), msg);
                    };

                    // Create the session and run it
                    auto conn = std::make_shared<session>(std::move(socket), state_, handler);
                    conn->run();
                }

                // Accept another connection
                do_accept();
            }
        };
    } // namespace Server
} // namespace WS

int main() {
    using namespace WS;
    net::io_context ioc;

    auto ls = std::make_shared<Server::listener>(ioc, tcp::endpoint{{}, 20'001});
    ls->run();

    Client::ClientService cs;
    cs.Connect("127.0.0.1", 20'001u);
    cs.Send("Hello world");
    cs.Send("Bye world");

    ioc.run_for(5s);
}

本地测试输出:

client 42 connect to host 127.0.0.1 port 20001
Making a handshake with 127.0.0.1:20001
void WS::Server::listener::on_accept(boost::beast::error_code, boost::asio::ip::tcp::socket): Success from 127.0.0.1:20000
Handshake successful.
void WS::Server::session::on_accept(boost::beast::error_code): Success
void WS::Client::session::do_write(): Initiating write (2 pending) 'Hello world'
void WS::Server::session::on_read(boost::system::error_code, size_t): Success, 11
From 127.0.0.1:20000: 'Hello world'
void WS::Client::session::do_write(): Initiating write (1 pending) 'Bye world'
void WS::Server::session::on_read(boost::system::error_code, size_t): Success, 9
From 127.0.0.1:20000: 'Bye world'
Closing down websocket
void WS::Client::session::on_read(boost::system::error_code, size_t): Operation canceled
~session()

相关问题