我正在使用FuturesUnordered
将异步工作负载排队到多线程时雄runner上。这些future返回各种不同类型的结果。为了区分它们,我将每个future的结果Map到一个自定义的Event
类型。
enum Event {
ResultTypeA {...},
ResultTypeB {...},
ResultTypeC {...},
ResultTypeD {...}
}
let pending_futures: FuturesUnordered<Pin<Box<dyn Future<Output = Event> + Send>>> = FuturesUnordered::default()
loop {
tokio::select! {
Some(future) = workload_receiver.recv() => {
pending_futures.push(future.boxed());
},
Some(event) = pending_futures.next() => process_event(event),
else => break,
}
}
上面的代码运行良好,但是,我想限制并行处理的pending_futures的数量。这就是buffered_unordered
的用武之地。我的天真方法是:
loop {
tokio::select! {
Some(future) = workload_receiver.recv() => {
pending_futures.push(future.boxed());
},
Some(event) = pending_futures.buffered(10).next() => process_event(event),
else => break,
}
}
这会掷回下列编译错误:
--> src/main.rs
|
257 | Some(event) = pending_futures.buffered(10).next() => process_event(event),
| ^^^^^^^^^^^^ `Event` is not a future
|
= help: the trait `futures::Future` is not implemented for `Event`
= note: Event must be a future or must implement `IntoFuture` to be awaited
note: required by a bound in `buffered`
--> futures-util-0.3.24/src/stream/stream/mod.rs:1359:21
|
1359 | Self::Item: Future,
| ^^^^^^ required by this bound in `buffered`
如何限制FuturesUnordered
在同一时间只处理其底层队列的N个future,但仍然允许动态地将新的future入队?
1条答案
按热度按时间ccrfmcuu1#
如果你想限制并发性,你不想使用
FuturesUnordered
,它会一直运行所有包含的任务。使用.buffered()
不会有帮助,因为它实现的Stream
是任务完成后的结果。如果你的
workload_receiver
是tokio::sync::mpsc::Receiver
,那你就走运了!你可以通过ReceiverStream
直接把它从tokio-stream箱子里转换成Stream
(也有其他东西的 Package )。这对.buffered()
或.buffered_unordered()
来说是完美的,因为你收到的东西看起来是Future
。