我阅读通过tcp与AsyncReadExt
不同的消息,我的想法将是避免复制的东西到内存和重用的缓冲区产生。
我下面展示的代码是我如何通过tcp读取的一个例子。首先,我读取包含要读取的字节总数的头部。然后,我做一个循环,从一个设置的大小填充缓冲区,当达到这个大小时,我创建一个消息并将其发送到另一个线程。在这个消息中,我必须做一个.to_vec()因为我已经确定它是Vec类型的。一旦它满了,我就重置这个缓冲区继续阅读。
编辑:小例子。
委托方:
我需要发送&[&[u8]]来避免在我的真实的应用程序中的某些变量的内存副本。出于这个原因,我提出了这个算法,以便能够发送所有的数据。
use std::error::Error;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
println!("[client] connected to server: {:?}", stream.peer_addr()?);
let mut response: u32 = 999999;
// Example of a message
// Headers -> &[1, 2, 3, 4]
// Body -> random_bytes
let random_bytes = vec![1; 1048576 * 5];
let data: &[&[u8]] = &[&[1, 2, 3, 4], &random_bytes];
// Send buffer capacity -> 1MB
let send_buffer_capacity: usize = 1048576;
// Header
let mut len = 0;
for slice in data {
len += slice.len() as u32;
}
//Send total len
stream.write_u32(len).await.unwrap();
//Send all data -> headers + body
for slice in data {
let iterations = slice.len() / send_buffer_capacity;
if iterations > 0 {
for i in 0..iterations {
let index = i as usize;
stream
.write_all(
&slice[send_buffer_capacity * index..send_buffer_capacity * (index + 1)],
)
.await
.unwrap();
}
let iter = iterations as usize;
stream
.write_all(&slice[send_buffer_capacity * iter..slice.len()])
.await
.unwrap();
} else {
stream.write_all(slice).await.unwrap();
}
}
stream.flush().await.unwrap();
// read respone from server
response = stream.read_u32().await.unwrap();
println!("Server response: {:?}", response);
Ok(())
}
字符串
伺服器:
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use std::error::Error;
use bytes::BytesMut;
use std::fmt::Debug;
pub struct Message {
pub id: u32,
pub sender_id: u32,
pub op_id: u32,
pub chunk_id: u32,
pub last_chunk: bool,
pub all_mess_len: u32,
pub bytes: Vec<u8>,
}
impl Message {
pub fn new(
id: u32,
sender_id: u32,
op_id: u32,
chunk_id: u32,
last_chunk: bool,
all_mess_len: u32,
bytes: Vec<u8>,
) -> Message {
Message {
id: id,
sender_id: sender_id,
op_id: op_id,
chunk_id: chunk_id,
last_chunk: last_chunk,
all_mess_len: all_mess_len,
bytes: bytes,
}
}
}
impl Debug for Message {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Message")
.field("id", &self.id)
.field("sender_id", &self.sender_id)
.field("op_id", &self.op_id)
.field("chunk_id", &self.chunk_id)
.field("last_chunk", &self.last_chunk)
.field("all_mess_len", &self.all_mess_len)
.field("bytes", &self.bytes.len())
.finish()
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:8080".to_string();
let listener = TcpListener::bind(&addr).await.unwrap();
loop {
match listener.accept().await {
Ok((mut stream, addr)) => {
println!("accepted a socket: {:?}", addr);
tokio::spawn(async move {
let send_buffer_capacity = 1048576;
let mut n_bytes_read = 0;
let mut chunk_id = 0;
let mut last_chunk = false;
// Get header
let total_bytes = stream.read_u32().await.unwrap();
let mut buffer = BytesMut::with_capacity(send_buffer_capacity);
let mut bytes_per_chunk = 0;
loop {
match stream.read_buf(&mut buffer).await {
Ok(0) => {
continue;
}
Ok(n) => {
bytes_per_chunk += n;
n_bytes_read += n;
if n_bytes_read == total_bytes.try_into().unwrap() {
last_chunk = true;
}
if bytes_per_chunk == send_buffer_capacity || last_chunk {
let message = Message::new(
0,
0,
0,
chunk_id,
last_chunk,
total_bytes,
buffer.to_vec(),
);
println!("message: {:?}", message);
//Send message to queue
//queue.push(message);
chunk_id += 1;
bytes_per_chunk = 0;
buffer = BytesMut::with_capacity(send_buffer_capacity);
}
if last_chunk {
break;
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
//debug!("Err: TCP -> SV (Write))");
continue;
}
Err(e) => {
println!("error: {:?}", e);
}
}
}
stream
.write_u32(1)
.await
.unwrap();
stream.flush().await.unwrap();
});
},
Err(err) => {
println!("No stream");
}
}
}
Ok(())
}
型
如果我将消息中的类型更改为BytesMut,我会在行中得到一个错误:
match stream.read_buf(&mut buffer).await {
型
它是:
let mut buffer = BytesMut::with_capacity(config.send_buffer_capacity);
| ---------- move occurs because `buffer` has type `BytesMut`, which does not implement the `Copy` trait
...
402 | loop {
| ---- inside of this loop
...
405 | buffer = BytesMut::with_capacity(config.send_buffer_capacity);
| ------ this reinitialization might get skipped
...
409 | match stream.read_buf(&mut buffer).await {
| ^^^^^^^^^^^ value borrowed here after move
...
430 | buffer,
| ------ value moved here, in previous iteration of loop
型
如果我做一个.clone()
的缓冲区,它会做同样的做一个.to_vec()
,它会使内存的副本。
然后,我必须重置出条件的缓冲区,什么都不会工作。
我尝试使用read_exact()
方法,但它从来没有写入缓冲区。
有没有什么方法可以让缓冲区保留在内存中,我可以在消息中传递引用,并且仍然使用缓冲区继续阅读?
2条答案
按热度按时间w46czmvw1#
到目前为止,解决这个问题最简单的方法是直接重新初始化
buffer
,而不是跟踪什么时候重新初始化(使用reset
)并只在下一次迭代中重新初始化。字符串
对此:
型
tcomlyy62#
我发现这个解决方案是使用
read_exact()
。问题在于向量的初始化。当使用Vec::with_capacity(n)
完成时,它不起作用,因为这个向量的大小是0,所以你不能写入它。如果你这样初始化它,它就起作用了:字符串
我留下整个解决方案:
型
另一个疑问出现在我身上,在这种情况下没有额外的内存复制,对吗?如果该消息被发送到另一个线程的队列/通道,该消息的引用将被发送,因此,我们将找到我们存储的缓冲区的引用?
分配内存和使用
BytesMut
向量之间有什么区别?在内存方面,最后你使用的是相同的,我理解,因为当你执行read_exact()
时,你已经知道缓冲区将被填充。这是真的吗?使用read_buf()
或read_exact()
更好吗?