我正在努力理解以下片段的行为:
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;
static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
fn main() {
let (sender, mut channel) = unbounded_channel();
GSENDER.set(sender).unwrap();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) // on a new thread
.enable_all()
.build()
.unwrap()
.spawn(async move {
println!("[{:?}] Starting channel", chrono::Utc::now());
while let Some(msg) = channel.recv().await {
println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
}
println!("[{:?}] Closing channel", chrono::Utc::now());
});
// Does not help, as it shouldn't anyway
// std::thread::sleep(std::time::Duration::from_secs(1));
if let Some(channel_in) = GSENDER.get() {
if let Err(SendError(_)) = channel_in.send("test") {
println!("[{:?}] Channel down", chrono::Utc::now());
}
} else {
unreachable!()
}
}
字符串
Link to playground to reproduce
创建新的运行时,产生一个future。然后,recv
被轮询。同时,我获得了发送方的一半并尝试发送一条消息。此时,接收方要么被移动到future,要么(添加了sleep)它甚至正在轮询recv
。
为什么发送方报告通道关闭?
1条答案
按热度按时间cnh2zyt31#
删除时雄运行时,在该运行时内产生的所有任务都将关闭(在下一个
.await
点)。此处的运行时是临时的,因此在语句末尾将其删除。该任务将只运行到第一个.await
点。让运行时成为一个活的变量,它就会工作:
字符串
Playground的一个。