c++ 如何避免并发回调到用户定义的例程?

tv6aics1  于 2023-04-13  发布在  其他
关注(0)|答案(1)|浏览(74)

我试图修改一些Boost代码,使其与Autoit兼容。原始项目可以找到here。我的版本可以找到here。我可以使用一些帮助来确定如何防止多个并发回调到用户提供的Autoit例程中。
下面是现有的on_read回调--

/// Callback registered by async_read. It calls user registered callback to actually process the data. And then issue another async_read to wait for data from server again.
    /// \param ec instance of error code
    /// \param bytes_transferred
    void
    on_read(
            beast::error_code ec,
            std::size_t bytes_transferred) {
        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
        }       
        boost::ignore_unused(bytes_transferred);

        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            if(!Is_Connected) {
                return;
            }

        }

        // error occurs
        if (ec) {
            if(on_fail_cb)
                on_fail_cb(L"read");
            return fail(ec, L"read");
        }

        const std::string data = beast::buffers_to_string(buffer_.data());
        const std::wstring wdata(data.begin(), data.end());
        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
        }       

//  The next section is where my issue resides

        if (on_data_cb)
            on_data_cb(wdata.c_str(), wdata.length());

        buffer_.consume(buffer_.size());

        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
        }       
        ws_.async_read(
                buffer_,
                beast::bind_front_handler(
                        &session::on_read,
                        shared_from_this()));

        // Close the WebSocket connection
        // ws_.async_close(websocket::close_code::normal,
        //     beast::bind_front_handler(
        //         &session::on_close,
        //         shared_from_this()));
    }

代码if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length());将回调函数执行到Autoit中,我需要知道如何防止它一次执行多次。我不太精通C++ / Boost,所以请小心。- )

bybem2ql

bybem2ql1#

温和的回答是指向文档:Strands: Use Threads Without Explicit Locking
在现实中,你没有展示足够的代码。例如,我们无法知道

  • 如果您使用的是一个io_context,其中有一个服务线程run()-ing它,那么您已经有了隐式串,并且保证没有处理程序同时运行
  • IO对象绑定到哪个执行器。在代码中,唯一可见的对象是ws_,我们假设它类似于
net::io_context                ctx_;
 websocket::stream<tcp::socket> ws_{ctx_};

现在,如果你想让多个线程为ctx_服务,你可以把ws_绑定到一个strand executor:

websocket::stream<tcp::socket> ws_{make_strand(ctx_)};

现在,只要你确保你自己的访问(例如async_ initiations)在正确的链上,你的代码就已经是安全的了。如果你想-你不介意硬编码executor类型,你可以Assert:
当前net::io_context::executor_type位置:首页〉〉技术支持〉〉技术支持〉〉技术支持〉〉技术支持

专业提示:

如果你真的提交了一个特定的executor类型,考虑静态绑定该类型:

using Context  = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand   = net::strand<net::io_context::executor_type>;
using Socket   = net::basic_stream_socket<tcp, Strand>;

Context                   ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};

这避免了类型擦除执行器的开销,并且可以简化Assert:

assert(ws_.get_executor().running_in_this_thread());

旁注

  • 使用atomic_bool而不是锁定一个完整的互斥体来检查一个布尔值。
  • 使用标准库功能(std::mutexstd::lock_guard),使之更轻松、更出色
  • 考虑为ANSI构建进行构建,或者检查您的加宽方法:
const std::wstring wdata(data.begin(), data.end());

是一个反模式。查看此处https://en.cppreference.com/w/cpp/string/multibyte或此处以获取更多信息https://en.cppreference.com/w/cpp/locale/ctype/widen

Demo

强制性“实时”代码:
Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;

static std::mutex s_consoleMtx;

static void fail(beast::error_code ec, std::string txt) {
    std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
}

#define ARCH_LABEL "STACKO"
struct session : std::enable_shared_from_this<session> {
    using Context  = net::io_context::executor_type;
    using Executor = net::io_context::executor_type;
    using Strand   = net::strand<net::io_context::executor_type>;
    using Socket   = net::basic_stream_socket<tcp, Strand>;

    Context                   ctx_;
    websocket::stream<Socket> ws_{make_strand(ctx_)};

    static bool const  EnableVerbose = true;
    std::atomic_bool   Is_Connected  = false;
    beast::flat_buffer buffer_;

    std::function<void(std::string)>         on_fail_cb;
    std::function<void(char const*, size_t)> on_data_cb;

    /// Callback registered by async_read. It calls user registered
    /// callback to actually process the data. And then issue another
    /// async_read to wait for data from server again. 
    /// \param ec instance of error code 
    /// \param bytes_transferred
    void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
        if (EnableVerbose) {
            std::lock_guard<std::mutex> guard(s_consoleMtx);
            std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
        }

        if (!Is_Connected)
            return;

        // error occurs
        if (ec) {
            if (on_fail_cb)
                on_fail_cb("read");
            return fail(ec, "read");
        }

        std::string const data = beast::buffers_to_string(buffer_.data());
        if (EnableVerbose) {
            std::lock_guard<std::mutex> guard(s_consoleMtx);
            std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
        }

        if (on_data_cb)
            on_data_cb(data.c_str(), data.length());

        buffer_.consume(buffer_.size());

        if (EnableVerbose) {
            std::lock_guard<std::mutex> guard(s_consoleMtx);
            std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
        }

        assert(ws_.get_executor().running_in_this_thread());
        ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
    }
};

相关问题