Rust:将select!与Futures组合无序.缓冲(N)

k0pti3hp  于 2022-11-12  发布在  其他
关注(0)|答案(1)|浏览(156)

我正在使用FuturesUnordered将异步工作负载排队到多线程时雄runner上。这些future返回各种不同类型的结果。为了区分它们,我将每个future的结果Map到一个自定义的Event类型。

enum Event {
   ResultTypeA {...},
   ResultTypeB {...},
   ResultTypeC {...},
   ResultTypeD {...}
}

let pending_futures: FuturesUnordered<Pin<Box<dyn Future<Output = Event> + Send>>> = FuturesUnordered::default()

loop {
    tokio::select! {
        Some(future) = workload_receiver.recv() => {
            pending_futures.push(future.boxed());
        },
        Some(event) = pending_futures.next() => process_event(event),
        else => break,
   }
}

上面的代码运行良好,但是,我想限制并行处理的pending_futures的数量。这就是buffered_unordered的用武之地。我的天真方法是:

loop {
    tokio::select! {
        Some(future) = workload_receiver.recv() => {
            pending_futures.push(future.boxed());
        },
        Some(event) = pending_futures.buffered(10).next() => process_event(event),
        else => break,
   }
}

这会掷回下列编译错误:

--> src/main.rs
     |
257  |     Some(event) = pending_futures.buffered(10).next() => process_event(event),
     |                                   ^^^^^^^^^^^^ `Event` is not a future
     |
     = help: the trait `futures::Future` is not implemented for `Event`
     = note: Event must be a future or must implement `IntoFuture` to be awaited
note: required by a bound in `buffered`
    --> futures-util-0.3.24/src/stream/stream/mod.rs:1359:21
     |
1359 |         Self::Item: Future,
     |                     ^^^^^^ required by this bound in `buffered`

如何限制FuturesUnordered在同一时间只处理其底层队列的N个future,但仍然允许动态地将新的future入队?

ccrfmcuu

ccrfmcuu1#

如果你想限制并发性,你不想使用FuturesUnordered,它会一直运行所有包含的任务。使用.buffered()不会有帮助,因为它实现的Stream是任务完成后的结果。
如果你的workload_receivertokio::sync::mpsc::Receiver,那你就走运了!你可以通过ReceiverStream直接把它从tokio-stream箱子里转换成Stream(也有其他东西的 Package )。这对.buffered().buffered_unordered()来说是完美的,因为你收到的东西看起来是Future

相关问题