Boost::Asio C++为什么我在阅读缓冲区时会得到垃圾?

5jvtdoz2  于 2023-02-14  发布在  其他
关注(0)|答案(1)|浏览(213)

我有一个客户端,它读取文件并将数据逐行发送到服务器。服务器必须计算发送的行数。我使用boost::asio::async_read_until来达到这个结果。但是我得到了垃圾(如下所示:行:�68�)。客户端仅以ASCII编码发送数据。
客户端代码片段:

std::ifstream infile(argv[1]);
    std::string line;

    while (std::getline(infile, line)) {
      boost::asio::write(s, boost::asio::buffer(line + '\n'));
      std::cout << line << std::endl;

服务器读取功能:

void handle_read(const boost::system::error_code& error, size_t bytes_trasferred, boost::asio::streambuf& buf)
    {
      if (!error)
    {
        if (!bytes_trasferred)
        {
            std::cout << "0 bytes trasferred\n";
            return;
        }

        std::string data = boost::asio::buffer_cast<const char*>(buf.data());
        std::istringstream is(data);
        std::string line;
        std::getline(is, line);
        std::cout << "Line: " << line << std::endl;    
    }
      else
        std::cerr << "Error: " << error.message() << std::endl;
    }

    void do_read()
    {
        std::cout << "do_read\n";
        auto self(shared_from_this());
        boost::asio::streambuf buf;
        buf.prepare(1048576);
        std::cout << "ASYNC\n";
        boost::asio::async_read_until(socket_, buf, "\n",
            boost::bind(&TcpConnection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, boost::ref(buf)));
    }

怎么解决?帮帮我,拜托!
我试着用self this来代替,这会导致内存泄漏。
我试过在句柄函数中获取数据后添加空终止符。ChatGPT告诉我这样做,但行为仍然相同。

ru9i0ody

ru9i0ody1#

1.此处:

std::string data = boost::asio::buffer_cast<const char*>(buf.data());

这是Undefined Behaviour,因为您假设buf.data()指向一个以NUL字符正确终止的字符序列,而您绝对没有理由这样假设。
1.此外,你还有UB,因为你传递了一个对buf的引用,一个局部变量,根据定义,在do_read返回后,它将不再有效。
1.第三,正如Alan指出的,您未能将共享指针(self)复制到绑定处理程序中。
您混合了许多不同的方法来解决处理逐行输入的问题。

  • 使用一个动态缓冲区(例如streambuf
  • 我把它设为成员,因为这就是为什么我们首先使用shared_from_this
  • 不要使用prepare(),因为async_read_until知道如何做到这一点(就像它为您做commit()一样)
      • 不要**使用consume(),而不要使用getline/again/,即使async_read_until已经告诉你换行符在哪里。

结合以下内容:

struct TcpConnection : std::enable_shared_from_this<TcpConnection> {
    TcpConnection(tcp::socket s) : socket_(std::move(s)) {}

    void run() {
        do_read();
    }

  private:
    tcp::socket     socket_;
    asio::streambuf buf;

    void handle_read(error_code ec, size_t n) {
        std::cerr << "handle_read: " << ec.message() << std::endl;
        if (!ec) {
            std::string const line(boost::asio::buffer_cast<char const*>(buf.data()), n);
            buf.consume(n);

            std::cout << "Line: " << quoted(line) << std::endl;
            do_read();
        }
    }

    void do_read() {
        async_read_until( //
            socket_, buf, "\n",
            boost::bind(&TcpConnection::handle_read, shared_from_this(), asio::placeholders::error,
                        asio::placeholders::bytes_transferred));
    }
};

我可能会简化,避免复制和潜在的分配:

struct TcpConnection : std::enable_shared_from_this<TcpConnection> {
    TcpConnection(tcp::socket s) : socket_(std::move(s)) {}

    void run() {
        buf_.reserve(8192); // optional, tune to taste
        do_read();
    }

  private:
    tcp::socket socket_;
    std::string buf_;

    void handle_read(error_code ec, size_t n) {
        std::cerr << "handle_read: " << ec.message() << std::endl;
        if (!ec) {
            auto line = std::string_view(buf_).substr(0, n);
            std::cout << "Line: " << quoted(line) << std::endl;
            buf_.erase(0, n);

            do_read();
        }
    }

    void do_read() {
        async_read_until( //
            socket_, asio::dynamic_buffer(buf_), "\n",
            boost::bind(&TcpConnection::handle_read, shared_from_this(), ph::error,
                        ph::bytes_transferred));
    }
};

这里仍然使用动态缓冲区,但是你不需要对istream做额外的处理,因为你显然不需要这样做。仅仅为了说明的目的,你可以把dynamic_string_buffer显式化:

void run() {
        backing_.reserve(8192); // optional, tune to taste
        do_read();
    }

  private:
    tcp::socket socket_;
    std::string backing_;
    asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>> buf_{backing_};

    void handle_read(error_code ec, size_t n) {
        std::cerr << "handle_read: " << ec.message() << std::endl;
        if (!ec) {
            auto line = std::string_view(backing_).substr(0, n);
            std::cout << "Line: " << quoted(line) << std::endl;
            buf_.consume(n); // look mom, it's a DynamiceBuffer all the same!

            do_read();
        }
    }

    void do_read() {
        async_read_until( //
            socket_, buf_, "\n",
            boost::bind(&TcpConnection::handle_read, shared_from_this(), ph::error,
                        ph::bytes_transferred));
    }

注意,我不会这样做,但它向您展示了这两种方法如何使用与streambuf相同的DynamicBuffer概念。

现场演示

    • 第一个e第一个f第一个x
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
namespace ph = asio::placeholders;
using asio::ip::tcp;
using boost::system::error_code;

struct TcpConnection : std::enable_shared_from_this<TcpConnection> {
    TcpConnection(tcp::socket s) : socket_(std::move(s)) {}

    void run() {
        buf_.reserve(8192); // optional, tune to taste
        do_read();
    }

  private:
    tcp::socket socket_;
    std::string buf_;

    void handle_read(error_code ec, size_t n) {
        if (n) {
            auto line = std::string_view(buf_).substr(0, n - 1); // exclude '\n'
            std::cout << socket_.remote_endpoint() << " Line: " << quoted(line) << std::endl;
            buf_.erase(0, n);
        }
        if (!ec)
            do_read();
        else
            std::cerr << "handle_read: n:" << n << " " << ec.message() << std::endl;
    }

    void do_read() {
        async_read_until( //
            socket_, asio::dynamic_buffer(buf_), "\n",
            boost::bind(&TcpConnection::handle_read, shared_from_this(), ph::error,
                        ph::bytes_transferred));
    }
};

struct Server {
    Server(asio::any_io_executor ex) : acc(ex, {{}, 7878}) {}

    void start() {
        do_accept();
    }

  private:
    tcp::acceptor acc;

    void do_accept() {
        acc.async_accept(make_strand(acc.get_executor()), [this](error_code ec, tcp::socket s) {
            if (!ec) {
                std::make_shared<TcpConnection>(std::move(s))->run();
                do_accept();
            }
        });
    }
};

int main() {
    asio::io_context ioc;

    Server srv(ioc.get_executor());
    srv.start();

    ioc.run();
}

客户端由一个非常类似的oneliner模拟:

netcat 127.0.0.1 7878 -w0 < main.cpp

图纸:

127.0.0.1:54860 Line: "#include <boost/asio.hpp>"
127.0.0.1:54860 Line: "#include <boost/bind/bind.hpp>"
127.0.0.1:54860 Line: "#include <iomanip>"
127.0.0.1:54860 Line: "#include <iostream>"
127.0.0.1:54860 Line: "namespace asio = boost::asio;"
...
...
127.0.0.1:54860 Line: "int main() {"
127.0.0.1:54860 Line: "    asio::io_context ioc;"
127.0.0.1:54860 Line: ""
127.0.0.1:54860 Line: "    Server srv(ioc.get_executor());"
127.0.0.1:54860 Line: "    srv.start();"
127.0.0.1:54860 Line: ""
127.0.0.1:54860 Line: "    ioc.run();"
127.0.0.1:54860 Line: "}"
handle_read: n:0 End of file

显示多个并发客户端的本地演示:

相关问题