rust 当发送方分配给静态时,时雄mpsc关闭通道

xcitsw88  于 11个月前  发布在  其他
关注(0)|答案(1)|浏览(90)

我正在努力理解以下片段的行为:

use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();
    
    GSENDER.set(sender).unwrap();

    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1) // on a new thread
        .enable_all()
        .build()
        .unwrap()
        .spawn(async move {
            println!("[{:?}] Starting channel", chrono::Utc::now());

            while let Some(msg) = channel.recv().await {
                println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
            }
            
            println!("[{:?}] Closing channel", chrono::Utc::now());
        });
       
    // Does not help, as it shouldn't anyway
    // std::thread::sleep(std::time::Duration::from_secs(1));
        
    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
        }
    } else {
        unreachable!()
    }
}

字符串
Link to playground to reproduce
创建新的运行时,产生一个future。然后,recv被轮询。同时,我获得了发送方的一半并尝试发送一条消息。此时,接收方要么被移动到future,要么(添加了sleep)它甚至正在轮询recv
为什么发送方报告通道关闭?

cnh2zyt3

cnh2zyt31#

删除时雄运行时,在该运行时内产生的所有任务都将关闭(在下一个.await点)。此处的运行时是临时的,因此在语句末尾将其删除。该任务将只运行到第一个.await点。
让运行时成为一个活的变量,它就会工作:

use std::sync::OnceLock;
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();

    GSENDER.set(sender).unwrap();

    let runtime = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1) // on a new thread
        .enable_all()
        .build()
        .unwrap();
    runtime.spawn(async move {
        println!("[{:?}] Starting channel", chrono::Utc::now());

        while let Some(msg) = channel.recv().await {
            println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
        }

        println!("[{:?}] Closing channel", chrono::Utc::now());
    });

    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
        }
    } else {
        unreachable!()
    }
}

字符串
Playground的一个。

相关问题