在我看来,并行调用太多tokio::time:sleep会产生死锁。在下面的示例代码中,我想做的是模拟使用事件总线来使用时雄运行时并行执行代码。
use eventador::{Eventador, SinkExt};
use tokio::time::{sleep, Duration, Instant};
use once_cell::sync::Lazy;
static INSTANT: Lazy<Instant> = Lazy::new(|| Instant::now());
pub struct Subscriber {
eventbus: Eventador,
i: u16
}
impl Subscriber {
pub fn new(i: u16, eventbus: Eventador) -> Self {
Self {
i, eventbus
}
}
pub async fn start(self) {
let subscription = self.eventbus.subscribe::<Event>();
let value = subscription.recv().value;
println!("pre sleep {} - {}ms since start", self.i, INSTANT.elapsed().as_millis());
let now = Instant::now();
sleep(Duration::from_millis(1000)).await;
println!("{}: {:?} - {}ms - {}ms since start", self.i, value, now.elapsed().as_millis(), INSTANT.elapsed().as_millis());
}
}
#[derive(Debug)]
pub struct Event {
pub value: u16
}
#[tokio::main]
async fn main() {
let eventbus = Eventador::new(1024).unwrap();
let mut publisher = eventbus.async_publisher::<Event>(512);
for i in 1..8 {
let subscriber = Subscriber::new(i, eventbus.clone());
println!("spawn {}", i);
tokio::spawn(subscriber.start());
}
println!("sending at {}", INSTANT.elapsed().as_millis());
publisher.send(Event { value: 1234 }).await.expect("Something went wrong");
println!("send finished");
sleep(Duration::from_millis(10000)).await;
println!("sleep finished");
}
上面的代码将产生以下输出:
spawn 1
spawn 2
spawn 3
spawn 4
spawn 5
spawn 6
spawn 7
sending at 0
pre sleep 3 - 1ms since start
pre sleep 4 - 1ms since start
pre sleep 1 - 1ms since start
pre sleep 5 - 1ms since start
send finished
pre sleep 2 - 1ms since start
pre sleep 6 - 1ms since start
pre sleep 7 - 1ms since start
5: 1234 - 1000ms - 1002ms since start
1: 1234 - 1000ms - 1002ms since start
4: 1234 - 1000ms - 1002ms since start
3: 1234 - 1000ms - 1002ms since start
7: 1234 - 1001ms - 1003ms since start
6: 1234 - 1001ms - 1004ms since start
2: 1234 - 1002ms - 1004ms since start
sleep finished
这就是我想看到的。但是,当我将订阅者的数量增加到10时(在for循环中),我得到以下输出
spawn 1
spawn 2
spawn 3
spawn 4
spawn 5
spawn 6
spawn 7
spawn 8
spawn 9
spawn 10
sending at 0
pre sleep 4 - 3ms since start
pre sleep 3 - 3ms since start
pre sleep 1 - 3ms since start
pre sleep 5 - 3ms since start
pre sleep 6 - 3ms since start
pre sleep 2 - 3ms since start
pre sleep 7 - 3ms since start
send finished
pre sleep 8 - 3ms since start
7: 1234 - 1001ms - 1004ms since start
3: 1234 - 1001ms - 1005ms since start
6: 1234 - 1001ms - 1005ms since start
4: 1234 - 1002ms - 1005ms since start
1: 1234 - 1002ms - 1005ms since start
2: 1234 - 1002ms - 1005ms since start
8: 1234 - 1001ms - 1005ms since start
5: 1234 - 1001ms - 1005ms since start
sleep finished
此外,该计划从未停止。为什么这很重要,当我只是运行一个模拟?我想确保在生产环境中,假设我同时读取100个文件,我不会遇到这种死锁--这将使我的想法完全无效。
1条答案
按热度按时间wfsdck301#
您正在使用
eventador
的同步API,阻塞了运行时,而 this 会导致死锁,而不是睡眠。使用异步版本(async_subscriber()
和async_publisher()
)。