我正在实现一个TCP服务器来传递消息。
从tokio-yamux
库中,我可以与不同的通道共享这个tcp连接。这个库只允许我使用AsyncWriteExt
和AsyncReadExt
方法来写入和读取字节。这不是问题,因为它适合我的需要。
当我发送一个完整的消息,其中缺少字节时,问题就来了,所以在应用程序中发生了全局阻塞。客户端发送X字节,在通过tcp发送之前,添加了一个长度为这些字节的报头。这样,服务器就可以确切地知道要读取的字节。
由于这个原因,当我在这种情况下写入字节262144时,只有262136是用write_all
或write
方法写入的。然后在头中表明,总共有262144,它被阻塞,因为仍然有字节要读。
我不明白为什么会这样。
小例子:客户:
use futures::prelude::*;
use std::{error::Error, vec};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use tokio_yamux::{config::Config, session::Session};
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let socket = TcpStream::connect("127.0.0.1:8080").await?;
println!("[client] connected to server: {:?}", socket.peer_addr()?);
println!("created stream");
let mut session = Session::new_client(socket, Config::default());
let ctrl = session.control();
let mut handles = Vec::new();
tokio::spawn(async move {
loop {
match session.next().await {
Some(Ok(_)) => (),
Some(Err(e)) => {
println!("{}", e);
break;
}
None => {
println!("closed");
break;
}
}
}
});
for _i in 0..1 {
let mut ctrl_clone = ctrl.clone();
handles.push(tokio::spawn(async move {
match ctrl_clone.open_stream().await {
Ok(mut stream) => {
// This sections represents Check User Operation
// Args -> operation_id = I selected a random i32
// -> queue_id = queue to write
// Writes to the server to identify the operation and get the queue
let operation_id = 0;
let queue_id = Some(2);
let data_to_send;
match queue_id {
Some(id) => {
data_to_send = vec![operation_id, id];
}
None => {
data_to_send = vec![operation_id];
}
}
let data_to_send: Vec<u8> = data_to_send
.clone()
.into_iter()
.flat_map(|x| i32::to_be_bytes(x))
.collect();
stream.write_all(&data_to_send).await.unwrap();
stream.flush().await.unwrap();
let sv_code: i32;
let mut buf = [1; 4];
// Reads from server to recieve an ACCEPTED MESSAGE
loop {
//stream.readable().await.unwrap();
match stream.read_exact(&mut buf).await {
Ok(0) => {}
Ok(n) => {
println!("Client: Reading Buffer: n_bytes {:?}", n);
sv_code = i32::from_be_bytes(buf);
break;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
//println!("Err: TCP -> SV (Write))");
continue;
}
Err(_e) => {
//return Err(e.into());
}
}
}
// Here I return a code to client just to Continue with the execution and start Sending messages
// Skipped
// This section represents SEND Operation
// Header
let data: [u8; 262140] = [1; 262140];
let mut vec_data = data.to_vec();
let len = data.len() as u32;
let len_slices = len.to_be_bytes();
for slice in len_slices {
vec_data.insert(0, slice);
}
println!("Total_len: {:?}", vec_data.len());
let n = stream.write_all(&vec_data).await.unwrap();
println!("N: {:?}", n);
stream.flush().await.unwrap();
println!("Fin write");
}
Err(e) => {
println!("{:?}", e);
}
}
}));
}
for handle in handles {
let _ = handle.await;
}
Ok(())
}
字符串
伺服器:
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use std::error::Error;
use futures::prelude::*;
use tokio_yamux::{config::Config, session::Session};
// https://github.com/nervosnetwork/tentacle
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:8080".to_string();
let listener = TcpListener::bind(&addr).await.unwrap();
while let Ok((socket, _)) = listener.accept().await {
println!("accepted a socket: {:?}", socket.peer_addr());
let mut session = Session::new_server(socket, Config::default());
tokio::spawn(async move {
while let Some(Ok(mut stream)) = session.next().await {
println!("Server accept a stream from client: id={}", stream.id());
tokio::spawn(async move {
let mut buffer = Vec::<u8>::with_capacity(8);
// Identify operation from Client
let op_code = stream.read_i32().await.unwrap();
println!("op_id: {:?}", op_code);
let queue_id = Some(stream.read_i32().await.unwrap());
println!("queue_id: {:?}", queue_id);
// Here I write to client -> Accepted
let mut sv_code: i32 = 0;
stream.write_all(&sv_code.to_be_bytes()).await.unwrap();
stream.flush().await.unwrap();
// Starting receiving messages
let mut total_bytes = 0;
let mut n_bytes_read = 0;
let mut chunk_id = 0;
let mut last_chunk = false;
let mut len_slices: [u8; 4] = [0; 4];
loop {
println!("n_bytes read: {:?}", n_bytes_read);
let mut capacity = 65535;
if n_bytes_read == 0 {
capacity = 65539;
}
let mut buffer = Vec::<u8>::with_capacity(capacity);
println!("Blocked?");
match stream.read_buf(&mut buffer).await {
Ok(0) => continue,
Ok(n) => {
println!("N: {:?}", n);
if n_bytes_read == 0 {
for i in 0..4 {
len_slices[i] = buffer.remove(0);
}
total_bytes = u32::from_le_bytes(len_slices);
total_bytes += 4;
}
buffer.truncate(n);
n_bytes_read += n;
if n_bytes_read == total_bytes.try_into().unwrap() {
last_chunk = true;
}
chunk_id += 1;
if last_chunk {
break;
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
println!("Err: TCP -> SV (Write))");
continue;
}
Err(e) => {
break;
}
}
}
println!("Finished");
});
}
});
}
Ok(())
}
型
输出:
委托方:
[client] connected to server: 127.0.0.1:8080
created stream
Client: Reading Buffer: n_bytes 4
Total_len: 262144
Blocked here
型
伺服器:
accepted a socket: Ok(127.0.0.1:52430)
Server accept a stream from client: id=1
n_bytes read: 0
Blocked?
N: 65539
n_bytes read: 65539
Blocked?
N: 65535
n_bytes read: 131074
Blocked?
N: 65535
n_bytes read: 196609
Blocked?
N: 65527
n_bytes read: 262136
Blocked?
型
1条答案
按热度按时间hivapdat1#
错误出现在Multiplexer Configuration中。在这里你可以看到:时雄_yamux::config::Config,
Config::default()
允许的最大窗口数为262144字节。在本示例中,这已经超过了,因此它在TcpStream上创建的流达到了设置的限制。如果创建新的配置,增加max_stream_window_size
的值,则此错误不会再发生。解决方法:
字符串