rust 通过tcp进行阅读时重用BytesMut缓冲区

klsxnrf1  于 12个月前  发布在  其他
关注(0)|答案(2)|浏览(112)

我阅读通过tcp与AsyncReadExt不同的消息,我的想法将是避免复制的东西到内存和重用的缓冲区产生。
我下面展示的代码是我如何通过tcp读取的一个例子。首先,我读取包含要读取的字节总数的头部。然后,我做一个循环,从一个设置的大小填充缓冲区,当达到这个大小时,我创建一个消息并将其发送到另一个线程。在这个消息中,我必须做一个.to_vec()因为我已经确定它是Vec类型的。一旦它满了,我就重置这个缓冲区继续阅读。
编辑:小例子。
委托方:
我需要发送&[&[u8]]来避免在我的真实的应用程序中的某些变量的内存副本。出于这个原因,我提出了这个算法,以便能够发送所有的数据。

use std::error::Error;

use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    println!("[client] connected to server: {:?}", stream.peer_addr()?);

    let mut response: u32 = 999999;

    // Example of a message
    // Headers -> &[1, 2, 3, 4]
    // Body -> random_bytes
    let random_bytes = vec![1; 1048576 * 5];
    let data: &[&[u8]] = &[&[1, 2, 3, 4], &random_bytes];

    // Send buffer capacity -> 1MB
    let send_buffer_capacity: usize = 1048576;

    // Header
    let mut len = 0;
    for slice in data {
        len += slice.len() as u32;
    }

    //Send total len
    stream.write_u32(len).await.unwrap();

    //Send all data -> headers + body
    for slice in data {
        let iterations = slice.len() / send_buffer_capacity;

        if iterations > 0 {
            for i in 0..iterations {
                let index = i as usize;

                stream
                    .write_all(
                        &slice[send_buffer_capacity * index..send_buffer_capacity * (index + 1)],
                    )
                    .await
                    .unwrap();
            }

            let iter = iterations as usize;
            stream
                .write_all(&slice[send_buffer_capacity * iter..slice.len()])
                .await
                .unwrap();
        } else {
            stream.write_all(slice).await.unwrap();
        }
    }
    stream.flush().await.unwrap();

    // read respone from server
    response = stream.read_u32().await.unwrap();

    println!("Server response: {:?}", response);

    Ok(())
}

字符串
伺服器:

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

use std::error::Error;
use bytes::BytesMut;
use std::fmt::Debug;

pub struct Message {
    pub id: u32,
    pub sender_id: u32,
    pub op_id: u32,
    pub chunk_id: u32,
    pub last_chunk: bool,
    pub all_mess_len: u32,
    pub bytes: Vec<u8>,
}

impl Message {
    pub fn new(
        id: u32,
        sender_id: u32,
        op_id: u32,
        chunk_id: u32,
        last_chunk: bool,
        all_mess_len: u32,
        bytes: Vec<u8>,
    ) -> Message {
        Message {
            id: id,
            sender_id: sender_id,
            op_id: op_id,
            chunk_id: chunk_id,
            last_chunk: last_chunk,
            all_mess_len: all_mess_len,
            bytes: bytes,
        }
    }
}

impl Debug for Message {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Message")
            .field("id", &self.id)
            .field("sender_id", &self.sender_id)
            .field("op_id", &self.op_id)
            .field("chunk_id", &self.chunk_id)
            .field("last_chunk", &self.last_chunk)
            .field("all_mess_len", &self.all_mess_len)
            .field("bytes", &self.bytes.len())
            .finish()
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = "127.0.0.1:8080".to_string();

    let listener = TcpListener::bind(&addr).await.unwrap();

    loop {
        match listener.accept().await {
            Ok((mut stream, addr)) => {
                println!("accepted a socket: {:?}", addr);

                tokio::spawn(async move {

                    let send_buffer_capacity = 1048576;

                    let mut n_bytes_read = 0;
                    let mut chunk_id = 0;
                    let mut last_chunk = false;
    
                    // Get header
                    let total_bytes = stream.read_u32().await.unwrap();
        
                    let mut buffer = BytesMut::with_capacity(send_buffer_capacity);
    
                    let mut bytes_per_chunk = 0;
    
                    loop {
                        match stream.read_buf(&mut buffer).await {
                            Ok(0) => {
                                continue;
                            }
                            Ok(n) => {
                                bytes_per_chunk += n;
                                n_bytes_read += n;
    
                                if n_bytes_read == total_bytes.try_into().unwrap() {
                                    last_chunk = true;
                                }
    
                                if bytes_per_chunk == send_buffer_capacity || last_chunk {
                                    let message = Message::new(
                                        0,
                                        0,
                                        0,
                                        chunk_id,
                                        last_chunk,
                                        total_bytes,
                                        buffer.to_vec(),
                                    );
    
                                    println!("message: {:?}", message);
                                    //Send message to queue
                                    //queue.push(message);
    
                                    chunk_id += 1;
    
                                    bytes_per_chunk = 0;
                                    buffer = BytesMut::with_capacity(send_buffer_capacity);
                                }
    
                                if last_chunk {
                                    break;
                                }
                            }
                            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                                //debug!("Err: TCP -> SV (Write))");
                                continue;
                            }
                            Err(e) => {
                                println!("error: {:?}", e);
                            }
                        }
                    }

                    stream
                        .write_u32(1)
                        .await
                        .unwrap();
                    stream.flush().await.unwrap();

                });
            },
            Err(err) => {
                println!("No stream");
            }
            
        }
    }

    Ok(())
}


如果我将消息中的类型更改为BytesMut,我会在行中得到一个错误:

match stream.read_buf(&mut buffer).await {


它是:

let mut buffer = BytesMut::with_capacity(config.send_buffer_capacity);
    |         ---------- move occurs because `buffer` has type `BytesMut`, which does not implement the `Copy` trait
...
402 |     loop {
    |     ---- inside of this loop
...
405 |             buffer = BytesMut::with_capacity(config.send_buffer_capacity);
    |             ------ this reinitialization might get skipped
...
409 |         match stream.read_buf(&mut buffer).await {
    |                               ^^^^^^^^^^^ value borrowed here after move
...
430 |                         buffer,
    |                         ------ value moved here, in previous iteration of loop


如果我做一个.clone()的缓冲区,它会做同样的做一个.to_vec(),它会使内存的副本。
然后,我必须重置出条件的缓冲区,什么都不会工作。
我尝试使用read_exact()方法,但它从来没有写入缓冲区。
有没有什么方法可以让缓冲区保留在内存中,我可以在消息中传递引用,并且仍然使用缓冲区继续阅读?

w46czmvw

w46czmvw1#

到目前为止,解决这个问题最简单的方法是直接重新初始化buffer,而不是跟踪什么时候重新初始化(使用reset)并只在下一次迭代中重新初始化。

struct BytesMut;
fn main() {
    let mut reset = false;
    let mut buffer = BytesMut;

    loop {
        if reset {
            buffer = BytesMut;
            reset = false;
        }
        let _message = (buffer,);
        reset = true;
    }
}

字符串
对此:

struct BytesMut;
fn main() {
    let mut buffer = BytesMut;

    loop {
        let _message = (buffer,);
        buffer = BytesMut;
    }
}

tcomlyy6

tcomlyy62#

我发现这个解决方案是使用read_exact()。问题在于向量的初始化。当使用Vec::with_capacity(n)完成时,它不起作用,因为这个向量的大小是0,所以你不能写入它。如果你这样初始化它,它就起作用了:

let mut buffer: Vec<u8> = vec![0; send_buffer_capacity as usize];

字符串
我留下整个解决方案:

tokio::spawn(async move {
                let send_buffer_capacity = 1048576;
                
                let mut chunk_id = 0;
                let mut last_chunk = false;

                // Get header
                let total_bytes = stream.read_u32().await.unwrap();

                let mut iterations = total_bytes / send_buffer_capacity;
                let last_buffer_capacity = total_bytes % send_buffer_capacity;

                if last_buffer_capacity > 0 {
                    iterations += 1;
                }

                for i in 0..iterations {

                    let mut buffer: Vec<u8> = vec![0; send_buffer_capacity as usize];

                    if last_buffer_capacity > 0 && i == (iterations - 1)  {
                        buffer = vec![0; last_buffer_capacity as usize];
                    }

                    match stream.read_exact(&mut buffer).await {
                        Ok(_) => {
                            if i == (iterations - 1) {
                                last_chunk = true;
                            }

                            let message = Message::new(
                                0,
                                0,
                                0,
                                chunk_id,
                                last_chunk,
                                total_bytes,
                                buffer,
                            );

                            println!("message: {:?}", message);
                            //Send message to queue
                            //queue.push(message);

                            chunk_id += 1;

                            if last_chunk {
                                break;
                            }
                        }
                        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                            //debug!("Err: TCP -> SV (Write))");
                            continue;
                        }
                        Err(e) => {
                            println!("error: {:?}", e);
                        }
                    }
                }

                stream.write_u32(1).await.unwrap();
                stream.flush().await.unwrap();
            });


另一个疑问出现在我身上,在这种情况下没有额外的内存复制,对吗?如果该消息被发送到另一个线程的队列/通道,该消息的引用将被发送,因此,我们将找到我们存储的缓冲区的引用?
分配内存和使用BytesMut向量之间有什么区别?在内存方面,最后你使用的是相同的,我理解,因为当你执行read_exact()时,你已经知道缓冲区将被填充。这是真的吗?使用read_buf()read_exact()更好吗?

相关问题