use futures::stream::select_all;
use futures::StreamExt;
use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::BroadcastStream;
#[tokio::main]
async fn main() {
let (sender1, receiver1) = tokio::sync::broadcast::channel(5);
let (sender2, receiver2) = tokio::sync::broadcast::channel(5);
let receivers = vec![receiver1, receiver2];
// Send on all channels
tokio::spawn(async move {
for i in 0..5 {
sleep(Duration::from_millis(50)).await;
sender1.send(format!("A{i}")).unwrap();
sleep(Duration::from_millis(50)).await;
sender2.send(format!("B{i}")).unwrap();
}
});
// Receive on all channels simultaneously
let mut fused_streams = select_all(receivers.into_iter().map(BroadcastStream::new));
while let Some(value) = fused_streams.next().await {
println!("Got value: {}", value.unwrap());
}
}
1条答案
按热度按时间yc0p9oo01#
broadcast::Receiver
还不是一个流,它只是一个有recv()
函数的对象,要合并多个,必须先将它们转换为流。幸运的是,
tokio-streams
机箱正好可以做到这一点。接收器转换为流后,可以使用
futures::stream::select_all
合并它们: