我有一个接收UDP消息的循环,为了打破这个循环,我使用了一个通道,接收关闭事件与select!
宏。我希望打破循环,每当我发送关闭事件(Ctrl+C)。但它没有按预期工作后,收到第一个UDP消息关闭没有被选中,没有打破循环。
下面是代码,我将添加步骤来查看相同的问题。
// [dependencies]
// tokio = { version = "1.23", features = ["full"] }
// tokio-stream = { version = "0.1" , features = ["sync"]}
// socket2 = "0.4"
// stream-cancel = "0.8"
// async-stream = "0.3"
// futures = "0.3"
use std::env::args;
use std::error::Error;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};
const DEFAULT_PORT: u16 = 10020;
const DEFAULT_MULTICAST: &str = "224.0.1.1";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mode = args().nth(1).unwrap_or_else(|| "server".to_string());
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
shutdown_watcher(shutdown_tx);
return if mode == "server" {
run_server(shutdown_rx).await
} else {
run_client().await
};
}
async fn run_server(rx: Receiver<()>) -> Result<(), Box<dyn Error>> {
let multicast = create_multicast()?;
let multicast = UdpSocket::from_std(multicast)?;
let multicast = Arc::new(multicast);
println!("Start Discovery Server");
start_discovery_server(multicast, rx).await;
Ok(())
}
async fn run_client() -> Result<(), Box<dyn Error>> {
let multicast = create_multicast()?;
let multicast = UdpSocket::from_std(multicast)?;
let multicast = Arc::new(multicast);
let msg = "My message".to_string();
let addr = SocketAddrV4::new(DEFAULT_MULTICAST.parse::<Ipv4Addr>()?, DEFAULT_PORT);
let len = multicast.send_to(msg.as_bytes(), &addr).await?;
println!("Client Sent {len} bytes.");
Ok(())
}
fn create_multicast() -> Result<std::net::UdpSocket, Box<dyn Error>> {
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
socket.set_reuse_address(true)?;
let multiaddr = Ipv4Addr::new(224, 0, 1, 1);
socket.set_multicast_loop_v4(true)?;
let addr = SocketAddr::new("0.0.0.0".parse()?, DEFAULT_PORT);
socket.bind(&SockAddr::from(addr))?;
let interface = socket2::InterfaceIndexOrAddress::Index(0);
socket.join_multicast_v4_n(&multiaddr, &interface)?;
Ok(socket.into())
}
async fn read_new_message(udp: Arc<UdpSocket>) -> String {
let mut buf = vec![0u8; 1024];
let result = udp.recv_from(&mut buf).await;
match result {
Ok((len, _addr)) => {
let msg = String::from_utf8_lossy(&buf[..len]);
msg.to_string()
}
Err(_) => "Error!".to_string(),
}
}
async fn start_discovery_server(udp: Arc<UdpSocket>, mut shutdown: Receiver<()>) {
loop {
tokio::select! {
msg = read_new_message(udp.clone()) => {
println!("Got: {msg:?}");
}
res = shutdown.recv() => {
println!("Got {res:?} for shutdown");
break
}
else => {
println!("Both channels closed");
break
}
}
println!("loop");
}
}
fn shutdown_watcher(tx: Sender<()>) {
tokio::spawn(async move {
println!("watcher started");
let _ = tokio::signal::ctrl_c().await;
let r = tx.send(());
println!("got ctrl+C {r:?}");
});
}
要看到这个问题,首先运行cargo run
,然后在另一个终端运行cargo run --release -- c
,现在按Ctrl+C在第一个终端的服务器应该停止,但它不工作。再次调用cargo run --release -- c
在第二个终端,现在该程序停止在第一个终端。
你能帮我找到阻止(Ctrl+C)事件立即停止代码的问题吗?
1条答案
按热度按时间vltsax251#
默认情况下,套接字是阻塞的,因此
recv_from()
阻塞,select!
不工作。应该调用
socket.set_nonblocking(true)
将套接字标记为非阻塞,这样I/O将是异步的。