我正在尝试“连接”多个对等体并同时“处理”它们。对于单个进程(命名为“tasks”)这可以正常工作。但是,在下面的设置中,我有3个任务,其中我希望第二个任务在一个对等体完成后为所有其他对等体中止。这应包括来自create_new_conn_fut
未来的任何未来对等方。
use futures::stream::StreamExt;
use rand::Rng;
pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
// simulate processing by sleeping
tokio::time::sleep(duration).await;
println!("task #{} done for {}", task_id, peer);
}
# [tokio::main]
async fn main() {
let peers = vec!["peer A", "peer B", "peer C"];
let peers = futures::stream::iter(peers);
let (tx, rx) = tokio::sync::mpsc::channel(100);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let rx = peers.chain(rx);
let handle_conn_fut = rx.for_each_concurrent(0,
|peer| async move {
let mut rng = rand::thread_rng();
println!("connecting to {}", peer);
process(peer, core::time::Duration::from_secs(1), "1").await;
process(peer, core::time::Duration::from_secs(rng.gen_range(5..15)), "2").await;
process(peer, core::time::Duration::from_secs(1), "3").await;
}
);
let create_new_conn_fut = async move {
for peer in ["peer D", "peer E"] {
tx.send(peer).await.unwrap();
}
};
// awaits all futures in parallell
futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}
输出:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C
task #3 done for peer C
task #2 done for peer D
task #2 done for peer A
task #2 done for peer B
task #3 done for peer D
task #3 done for peer A
task #3 done for peer B
task #2 done for peer E
task #3 done for peer E
我宁愿任务#2被放弃,为所有的对等体一旦一个对等体已经完成了它,并重新定向所有未来的对等体只做任务#1和任务#3。
我有以下几个例子来说明这一点
A B .... E
↓ ↓ ↓
async task #1 async task #1 async task #1
↓ ↓ ↓
async task #2 async task #2 async task #2
↓ ↓ ↓
async task #3 async task #3 async task #3
↓ ↓ ↓
done done done
我想在任何一个对等体(A-E)完成它之后短路async task #1
。
A B .... E .... F (future peer)
↓ ↓ ↓ ↓
async task #1 async task #1 async task #1 async task #1
↓ ↓ ↓ ↓
↓ async task #2 ↓ ↓
↓ ↓ ↓ ↓
async task #3 async task #3 async task #3 async task #3
↓ ↓ ↓ ↓
done done done done
所以我想要的输出是:
connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C <- will abort all other task #2
task #3 done for peer A
task #3 done for peer B
task #3 done for peer C
task #3 done for peer D
task #3 done for peer E
我已经研究了futures::future::AbortHandle
,但我认为这只适用于单一的未来-因为futures::stream::AbortRegistration
没有克隆特性?
如何实现这样的事情?
1条答案
按热度按时间nxagd54h1#
一个想法是使用带分支的
tokio::select
来检查任务是否完成或是否被取消:输出:
然而,代码变得有点冗长。有没有更好的方法来编写它?
人们可能会使用
futures::future::Either
或使用shared
未来。