rust同时接收来自多个通道消息

o4hqfura  于 2022-12-29  发布在  其他
关注(0)|答案(1)|浏览(122)

我有一个Vec<tokio::sync::broadcast::Receiver<String>>(基本上是一个向量的通道接收器),我想订阅所有的人,并从他们所有的消息。我该怎么做呢?

yc0p9oo0

yc0p9oo01#

broadcast::Receiver还不是一个流,它只是一个有recv()函数的对象,要合并多个,必须先将它们转换为流。
幸运的是,tokio-streams机箱正好可以做到这一点。
接收器转换为流后,可以使用futures::stream::select_all合并它们:

use futures::stream::select_all;
use futures::StreamExt;
use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::BroadcastStream;

#[tokio::main]
async fn main() {
    let (sender1, receiver1) = tokio::sync::broadcast::channel(5);
    let (sender2, receiver2) = tokio::sync::broadcast::channel(5);

    let receivers = vec![receiver1, receiver2];

    // Send on all channels
    tokio::spawn(async move {
        for i in 0..5 {
            sleep(Duration::from_millis(50)).await;
            sender1.send(format!("A{i}")).unwrap();
            sleep(Duration::from_millis(50)).await;
            sender2.send(format!("B{i}")).unwrap();
        }
    });

    // Receive on all channels simultaneously
    let mut fused_streams = select_all(receivers.into_iter().map(BroadcastStream::new));
    while let Some(value) = fused_streams.next().await {
        println!("Got value: {}", value.unwrap());
    }
}
Got value: A0
Got value: B0
Got value: A1
Got value: B1
Got value: A2
Got value: B2
Got value: A3
Got value: B3
Got value: A4
Got value: B4

相关问题