循环中的铁 rust 期货选择未按预期工作

vcudknz3  于 2023-01-26  发布在  其他
关注(0)|答案(1)|浏览(109)

我有一个接收UDP消息的循环,为了打破这个循环,我使用了一个通道,接收关闭事件与select!宏。我希望打破循环,每当我发送关闭事件(Ctrl+C)。但它没有按预期工作后,收到第一个UDP消息关闭没有被选中,没有打破循环。
下面是代码,我将添加步骤来查看相同的问题。

// [dependencies]
// tokio = { version = "1.23", features = ["full"] }
// tokio-stream = { version = "0.1" , features = ["sync"]}
// socket2 = "0.4"
// stream-cancel = "0.8"
// async-stream = "0.3"
// futures = "0.3"

use std::env::args;
use std::error::Error;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;

use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender};

const DEFAULT_PORT: u16 = 10020;
const DEFAULT_MULTICAST: &str = "224.0.1.1";

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mode = args().nth(1).unwrap_or_else(|| "server".to_string());

    let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
    shutdown_watcher(shutdown_tx);

    return if mode == "server" {
        run_server(shutdown_rx).await
    } else {
        run_client().await
    };
}

async fn run_server(rx: Receiver<()>) -> Result<(), Box<dyn Error>> {
    let multicast = create_multicast()?;
    let multicast = UdpSocket::from_std(multicast)?;
    let multicast = Arc::new(multicast);
    println!("Start Discovery Server");
    start_discovery_server(multicast, rx).await;

    Ok(())
}

async fn run_client() -> Result<(), Box<dyn Error>> {
    let multicast = create_multicast()?;
    let multicast = UdpSocket::from_std(multicast)?;
    let multicast = Arc::new(multicast);
    let msg = "My message".to_string();
    let addr = SocketAddrV4::new(DEFAULT_MULTICAST.parse::<Ipv4Addr>()?, DEFAULT_PORT);
    let len = multicast.send_to(msg.as_bytes(), &addr).await?;
    println!("Client Sent {len} bytes.");
    Ok(())
}

fn create_multicast() -> Result<std::net::UdpSocket, Box<dyn Error>> {
    let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
    socket.set_reuse_address(true)?;
    let multiaddr = Ipv4Addr::new(224, 0, 1, 1);
    socket.set_multicast_loop_v4(true)?;
    let addr = SocketAddr::new("0.0.0.0".parse()?, DEFAULT_PORT);
    socket.bind(&SockAddr::from(addr))?;
    let interface = socket2::InterfaceIndexOrAddress::Index(0);
    socket.join_multicast_v4_n(&multiaddr, &interface)?;
    Ok(socket.into())
}

async fn read_new_message(udp: Arc<UdpSocket>) -> String {
    let mut buf = vec![0u8; 1024];
    let result = udp.recv_from(&mut buf).await;
    match result {
        Ok((len, _addr)) => {
            let msg = String::from_utf8_lossy(&buf[..len]);
            msg.to_string()
        }
        Err(_) => "Error!".to_string(),
    }
}

async fn start_discovery_server(udp: Arc<UdpSocket>, mut shutdown: Receiver<()>) {
    loop {
        tokio::select! {
            msg = read_new_message(udp.clone()) => {
                println!("Got: {msg:?}");
            }
            res = shutdown.recv() => {
                println!("Got {res:?} for shutdown");
                break
            }
            else => {
                println!("Both channels closed");
                break
            }
        }
        println!("loop");
    }
}

fn shutdown_watcher(tx: Sender<()>) {
    tokio::spawn(async move {
        println!("watcher started");
        let _ = tokio::signal::ctrl_c().await;
        let r = tx.send(());
        println!("got ctrl+C {r:?}");
    });
}

要看到这个问题,首先运行cargo run,然后在另一个终端运行cargo run --release -- c,现在按Ctrl+C在第一个终端的服务器应该停止,但它不工作。再次调用cargo run --release -- c在第二个终端,现在该程序停止在第一个终端。
你能帮我找到阻止(Ctrl+C)事件立即停止代码的问题吗?

vltsax25

vltsax251#

默认情况下,套接字是阻塞的,因此recv_from()阻塞,select!不工作。
应该调用socket.set_nonblocking(true)将套接字标记为非阻塞,这样I/O将是异步的。

相关问题