c++ 如何使用升压分解器与DC-Connect?

sgtfey8w  于 2023-10-21  发布在  其他
关注(0)|答案(1)|浏览(83)

我正在尝试将解析器添加到我的UDP套接字连接类中。我使用连接/发送接口,我想在连接中解析和连接,并在发送中发送数据。我希望回调给予的resolve函数能给出一个端点范围,我应该遍历它以找到可用的端点。所以我实现了connect方法,它检查所有端点,但它看起来非常庞大和复杂。

void UdpConnection::Connect()
{
    std::promise<void> connectPromise;
    auto connectFuture = connectPromise.get_future();

    auto handleResolveCallback = [this, p = std::move(connectPromise), self = shared_from_this()]
        (const boost::system::error_code& ec, boost::asio::ip::udp::resolver::iterator endpoint_iterator) mutable 
            {
                if (ec)
                {
                    p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                }
                else
                {
                    while (endpoint_iterator != boost::asio::ip::udp::resolver::iterator())
                    {
                        try {
                            std::promise<void> connectPromise2;
                            auto connectFuture2 = connectPromise2.get_future();

                            boost::asio::ip::udp::endpoint iter = *endpoint_iterator;
                            boost::asio::post(m_socket.get_executor(),
                                [this, p = std::move(connectPromise2), self = shared_from_this(), iter]() mutable { DoConnect(std::move(p), std::move(iter)); });

                            connectFuture2.get();
                            break;
                        }
                        catch (const std::exception& ex)
                        {
                            std::cout<< "Run-time exception: " << ex.what() << std::endl;
                            ++endpoint_iterator;
                        }
                    };
                }
            };
    
    m_resolver.async_resolve(boost::asio::ip::udp::v4(), m_to.c_str(), m_port.c_str(), std::move(handleResolveCallback));
    connectFuture.get();
}
void UdpConnection::DoConnect(std::promise<void> p, boost::asio::ip::udp::endpoint iter)
{
    auto handleConnect = [p = std::move(p), self = shared_from_this()](const boost::system::error_code& ec) mutable
    {
        EKA_TRACE_DEBUG_EX(self->GetTracer()) << TR << "Connect: " << ec.message();

        if (ec)
            p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
        else {
            p.set_value();
        }
    };

    m_socket.async_connect(iter, std::move(handleConnect));
}

我不知道为什么它不调用DoConnect,而是停留在connectFuture.get()中。痕迹:

@asio|1696946039.156806|0*1|[email protected](blk=never,rel=fork)
@asio|1696946039.156806|>1|
@asio|1696946039.157804|>0|
@asio|1696946039.874930|0*2|[email protected](blk=never,rel=fork)

正确的解决方法是什么?
完整示例http://coliru.stacked-crooked.com/a/737a0927ecd74458
P.S.我知道UDP不能使用连接,但是TCP和UDP在boost中有类似的接口,所以我认为我的问题不是在协议上。

qnyhuwrf

qnyhuwrf1#

你发布的工作,然后立即阻止它-这是毫无用处的定义。你总是可以直接调用你发布的函数。
这突出了问题:你在服务线程上阻塞 *,从而确保没有处理程序可以运行。这意味着DoConnect任务永远不会运行。
相反,应该这样组织BRAC调用链,使得(完成)处理程序中没有任何东西阻塞。例如,我以前是如何处理相关的TCP问题的:

Coliru

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
namespace ssl  = asio::ssl;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

namespace protocoller::syslog {
    static inline std::ostream& EKA_TRACE_ERROR() {
        thread_local std::ostream instance(std::cout.rdbuf());
        return instance;
    }

    struct IConnection {
        virtual ~IConnection()                          = default;
        virtual void Connect()                          = 0;
        virtual void SendMessageAsync(std::string_view) = 0;
        virtual void Close()                            = 0;
    };

    struct TcpConnection
        : IConnection
        , std::enable_shared_from_this<TcpConnection> //
    {
        TcpConnection(asio::any_io_executor ex, tcp::endpoint ep, ssl::context& ctx)
            : m_socket(make_strand(ex), ctx)
            , m_ep(ep) {}

        void Connect() override {
            std::promise<void> p;
            auto f = p.get_future();
            asio::post(m_socket.get_executor(),
                       [this, p = std::move(p), self = shared_from_this()]() mutable { //
                           do_connect(std::move(p));
                       });
            f.get();
        }

        void Close() override {
            asio::post(m_socket.get_executor(), [this, self = shared_from_this()] { do_close(); });
        }

        void SendMessageAsync(std::string_view message) override {
            asio::post(m_socket.get_executor(),
                       [this, self = shared_from_this(), msg = std::string(message)]() mutable {
                           outbox_.push_back(std::move(msg));
                           if (outbox_.size() == 1)
                               do_send_loop();
                       });
        }

      private:
        void do_connect(std::promise<void> p) {
            EKA_TRACE_ERROR() << "Connecting to: " << m_ep << std::endl;
            m_socket.next_layer().async_connect(
                m_ep, [this, p = std::move(p), self = shared_from_this()](error_code ec) mutable {
                    EKA_TRACE_ERROR() << "Connect: " << ec.message() << std::endl;
                    if (ec)
                        p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                    else {
                        m_socket.async_handshake( //
                            Stream::client, [p = std::move(p), self](error_code ec) mutable {
                                EKA_TRACE_ERROR() << "Handshake: " << ec.message() << std::endl;
                                if (ec)
                                    p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                                else
                                    p.set_value();
                            });
                    }
                });
        }

        void do_send_loop() {
            if (outbox_.empty())
                return;
            EKA_TRACE_ERROR() << "SendLoop: " << quoted(outbox_.front()) << std::endl;
            outbox_.front() += "\n";
            asio::async_write( //
                m_socket, asio::buffer(outbox_.front()),
                [this, self = shared_from_this()](error_code ec, size_t /*bytes_transferred*/) {
                    EKA_TRACE_ERROR() << "HandleWritten: " << ec.message() << std::endl;
                    if (!ec) {
                        outbox_.pop_front();
                        do_send_loop();
                    }
                });
        }

        void do_close() {
            EKA_TRACE_ERROR() << "Closing connection" << std::endl;
            m_socket.async_shutdown([self = shared_from_this()](error_code ec) {
                EKA_TRACE_ERROR() << "Shutdown: " << ec.message() << std::endl;
            });
        }

      private:
        using Stream = ssl::stream<tcp::socket>;
        std::deque<std::string> outbox_;
        Stream                  m_socket;
        tcp::endpoint           m_ep;
    };
} // namespace protocoller::syslog

int main(int argc, char** argv) {
    asio::thread_pool ioc{1};
    ssl::context ctx{ssl::context::tls_client};
    ctx.set_default_verify_paths();

    using namespace protocoller::syslog;
    std::shared_ptr<IConnection> conn =
        std::make_shared<TcpConnection>(ioc.get_executor(), tcp::endpoint{{}, 12514}, ctx);

    conn->Connect();

    std::vector<std::string> words(argv+1, argv+argc);
    words.push_back("bye world");
    for (auto w : words)
        conn->SendMessageAsync("syslog tls client: " + w);

    std::this_thread::sleep_for(5000ms);
    conn->Close();

    ioc.join();
}

更新:回顾

阅读你的full example有.很多问题。

  • 抽象基类缺少虚拟dtor
  • 你正在使用不同的链来处理 * 应该 * 在同一链上的事情,修复:
, m_socket(make_strand(ex), udp::v4())
     , m_resolver(m_socket.get_executor()) {}
  • 你混合了boost::enable_shared_from_thisstd::shared_ptr
  • Connect中,您立即调用async_resolve,其想法是将that发布到strand
  • 完成处理程序实际上将**在链上,所以你/had/的asio::post在那里是无用的。
  • asio::post是一个nrc初始化,这意味着它将总是立即返回(并且没有例外),使得循环也无用。
  • 使用asio::async_connect代替手动枚举端点
  • 你还有第二个承诺。你只答应我一件事。第二个promise只会让你想要阻塞第一个promise。这就是你的问题所在不要这样做。只需要沿着原始promise,直到连接真正完成

P.S.讽刺的是,我后来注意到,很明显你的例子是基于上面的TCP TLS客户端的(正如“错误的”"syslog tls client: "字符串所证明的那样)。正如你将在下面的摘要中看到的,它有 * 正是 * 你在删除它之前所需要的解决方案:)

  • 您已经将msg = std::string(msg)SendMessageAsync lambda更改为msg = std::string(msg)。这将创建UB,因为字符串视图可能会引用在运行bloc操作时不再可用的内存。这也是为什么m_outbox包含std::string而不是std::string_view非常重要。
  • 您将async_send_to分配给m_ep,但m_ep从未分配。因此,任何东西都不会被发送(更不用说到达了)。
  • 由于你正在尝试测试解析器,你可能应该将构造函数参数从:
std::shared_ptr<IConnection> conn =
     std::make_shared<UdpConnection>(ioc.get_executor(), "127.0.0.1", "514");

更像是:

std::shared_ptr<IConnection> conn =
     std::make_shared<UdpConnection>(ioc.get_executor(), "localhost", "syslog");

修复你得到的所有这些东西:

Live On Coliru

// #define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>

using namespace std::chrono_literals;
namespace asio = boost::asio;

struct IConnection {
    using error_code = boost::system::error_code;
    using udp        = asio::ip::udp;

    virtual ~IConnection()                          = default;
    virtual void Connect()                          = 0;
    virtual void SendMessageAsync(std::string_view) = 0;
    virtual void Close()                            = 0;
};

class UdpConnection
    : public IConnection
    , public std::enable_shared_from_this<UdpConnection> {
  public:
    UdpConnection(asio::any_io_executor ex, std::string to, std::string port);

    void Connect()                                  override; 
    void SendMessageAsync(std::string_view message) override; 
    void Close()                                    override; 

  private:
    void DoSendLoop();
    void HandleWritten(error_code ec, size_t bytes_transferred);
    void DoConnect(std::promise<void> p);

  private:
    const std::string m_to;
    const std::string m_port;

    udp::socket   m_socket;
    udp::resolver m_resolver;
    udp::endpoint m_ep;

    std::deque<std::string> m_outbox;
};

UdpConnection::UdpConnection(asio::any_io_executor ex, std::string to, std::string port)
    : m_to(std::move(to))
    , m_port(std::move(port))
    , m_socket(make_strand(ex), udp::v4())
    , m_resolver(m_socket.get_executor()) {}

void UdpConnection::Connect() {
    std::promise<void> p;
    auto f = p.get_future();

    asio::post(                                                          //
        m_socket.get_executor(),                                         //
        [this, p = std ::move(p), self = shared_from_this()]() mutable { //
            DoConnect(std::move(p));
        });
    f.get();
}

void UdpConnection::SendMessageAsync(std::string_view message) {
    asio::post(m_socket.get_executor(),
               [this, self = shared_from_this(), msg = std::string(message)]() mutable {
                   m_outbox.emplace_back(std::move(msg));
                   if (m_outbox.size() == 1)
                       DoSendLoop();
               });
}

void UdpConnection::DoSendLoop()
{
    if (m_outbox.empty())
        return;

    m_socket.async_send_to( //
        asio::buffer(m_outbox.front()), m_ep,
        bind(&UdpConnection::HandleWritten, shared_from_this(), std::placeholders::_1,
             std::placeholders::_2));
}

void UdpConnection::Close()
{
    try {
        if (m_socket.is_open()) {
            error_code ignoredEc;
            m_socket.shutdown(udp::socket::shutdown_both, ignoredEc);
            m_socket.close(ignoredEc);
        }
    } catch (std::exception const& ex) {
        std::cout<< "Run-time exception: " << ex.what() << std::endl;
    }
}

void UdpConnection::HandleWritten(error_code  ec, size_t /*bytes_transferred*/) {
    m_outbox.pop_front();

    if (!ec) {
        std::cout << "Connect: " << ec.message() << std::endl;
    }

    DoSendLoop();
}

void UdpConnection::DoConnect(std::promise<void> p) {
    m_resolver.async_resolve(
        udp::v4(), m_to, m_port,
        [this, p = std::move(p), self = shared_from_this()] //
        (error_code ec, udp::resolver::iterator eps) mutable {
            if (ec) {
                p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
            } else {
                asio::async_connect( //
                    m_socket, eps,
                    [this, p = std::move(p), self = shared_from_this()] //
                    (error_code ec, udp::resolver::iterator it) mutable {
                        if (ec) {
                            std::cout << "Connect: " << ec.message() << std::endl;
                            p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                        } else {
                            m_ep = it->endpoint();
                            std::cout << "Connect: " << m_ep << std::endl;
                            p.set_value();
                        }
                    });
            }
        });
}

int main(int argc, char** argv) {
    asio::thread_pool ioc{1};

    std::shared_ptr<IConnection> conn =
        std::make_shared<UdpConnection>(ioc.get_executor(), "localhost", "syslog");

    conn->Connect();

    std::vector<std::string> words(argv + 1, argv + argc);
    words.push_back("bye world");
    for (auto w : words)
        conn->SendMessageAsync("syslog udp client: " + w);

    std::this_thread::sleep_for(5000ms);
    conn->Close();

    ioc.join();
}

一个当地的示范:

摘要

简而言之,promise only 用于最终用户在连接完成时进行同步。你不能阻止内线。
此外,有趣的是,所有这些都已经在启用TLS的示例中演示过了,您显然将其作为起点:

void do_connect(std::promise<void> p) {
    EKA_TRACE_ERROR() << "Connecting to: " << m_ep << std::endl;
    m_socket.next_layer().async_connect(
        m_ep, [this, p = std::move(p), self = shared_from_this()](error_code ec) mutable {
            EKA_TRACE_ERROR() << "Connect: " << ec.message() << std::endl;
            if (ec)
                p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
            else {
                m_socket.async_handshake( //
                    Stream::client, [p = std::move(p), self](error_code ec) mutable {
                        EKA_TRACE_ERROR() << "Handshake: " << ec.message() << std::endl;
                        if (ec)
                            p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                        else
                            p.set_value();
                    });
            }
        });
}

这在结构上与我们在所有修复之后最终得到的DoConnect相同:

void UdpConnection::DoConnect(std::promise<void> p) {
    m_resolver.async_resolve(
        udp::v4(), m_to, m_port,
        [this, p = std::move(p), self = shared_from_this()] //
        (error_code ec, udp::resolver::iterator eps) mutable {
            if (ec) {
                p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
            } else {
                asio::async_connect( //
                    m_socket, eps,
                    [this, p = std::move(p), self = shared_from_this()] //
                    (error_code ec, udp::resolver::iterator it) mutable {
                        if (ec) {
                            std::cout << "Connect: " << ec.message() << std::endl;
                            p.set_exception(std::make_exception_ptr(boost::system::system_error(ec)));
                        } else {
                            m_ep = it->endpoint();
                            std::cout << "Connect: " << m_ep << std::endl;
                            p.set_value();
                        }
                    });
            }
        });
}

相关问题