rust 并发处理future::stream::Stream中的元素时中止中介future

xxe27gdn  于 2022-11-12  发布在  其他
关注(0)|答案(1)|浏览(146)

我正在尝试“连接”多个对等体并同时“处理”它们。对于单个进程(命名为“tasks”)这可以正常工作。但是,在下面的设置中,我有3个任务,其中我希望第二个任务在一个对等体完成后为所有其他对等体中止。这应包括来自create_new_conn_fut未来的任何未来对等方。

use futures::stream::StreamExt;
use rand::Rng;

pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(duration).await;
    println!("task #{} done for {}", task_id, peer);
}

# [tokio::main]

async fn main() {
    let peers = vec!["peer A", "peer B", "peer C"];
    let peers = futures::stream::iter(peers);

    let (tx, rx) = tokio::sync::mpsc::channel(100);

    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
    let rx = peers.chain(rx);

    let handle_conn_fut = rx.for_each_concurrent(0,
        |peer| async move {
            let mut rng = rand::thread_rng();

            println!("connecting to {}", peer);
            process(peer, core::time::Duration::from_secs(1), "1").await;

            process(peer, core::time::Duration::from_secs(rng.gen_range(5..15)), "2").await;

            process(peer, core::time::Duration::from_secs(1), "3").await;
        }
    );

    let create_new_conn_fut = async move {
        for peer in ["peer D", "peer E"] {
            tx.send(peer).await.unwrap();
        }
    };

    // awaits all futures in parallell
    futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}

输出:

connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C
task #3 done for peer C
task #2 done for peer D
task #2 done for peer A
task #2 done for peer B
task #3 done for peer D
task #3 done for peer A
task #3 done for peer B
task #2 done for peer E
task #3 done for peer E

我宁愿任务#2被放弃,为所有的对等体一旦一个对等体已经完成了它,并重新定向所有未来的对等体只做任务#1和任务#3。
我有以下几个例子来说明这一点

A                B           ....          E
     ↓                ↓                         ↓
async task #1    async task #1             async task #1
     ↓                ↓                         ↓
async task #2    async task #2             async task #2
     ↓                ↓                         ↓
async task #3    async task #3             async task #3
     ↓                ↓                         ↓
    done             done                      done

我想在任何一个对等体(A-E)完成它之后短路async task #1

A                B           ....          E          ....           F (future peer)
     ↓                ↓                         ↓                         ↓
async task #1    async task #1             async task #1             async task #1 
     ↓                ↓                         ↓                         ↓
     ↓           async task #2                  ↓                         ↓
     ↓                ↓                         ↓                         ↓
async task #3    async task #3             async task #3             async task #3
     ↓                ↓                         ↓                         ↓
    done             done                      done                      done

所以我想要的输出是:

connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer C    <- will abort all other task #2
task #3 done for peer A
task #3 done for peer B
task #3 done for peer C
task #3 done for peer D
task #3 done for peer E

我已经研究了futures::future::AbortHandle,但我认为这只适用于单一的未来-因为futures::stream::AbortRegistration没有克隆特性?
如何实现这样的事情?

nxagd54h

nxagd54h1#

一个想法是使用带分支的tokio::select来检查任务是否完成或是否被取消:

use futures::stream::StreamExt;
use rand::Rng;
use futures::future;

pub async fn process(peer: &str, duration: core::time::Duration, task_id: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(duration).await;
    println!("task #{} done for {}", task_id, peer);
}

# [tokio::main]

async fn main() {
    let peers = vec!["peer A", "peer B", "peer C"];
    let peers = futures::stream::iter(peers);

    let (tx, rx) = tokio::sync::mpsc::channel(100);

    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
    let rx = peers.chain(rx);

    let notify = std::sync::Arc::new(tokio::sync::Notify::new());
    let done_with_task_2 = std::rc::Rc::new(std::cell::RefCell::new(false));

    let handle_conn_fut = rx.for_each_concurrent(0, |peer| {
        let notify = notify.clone();
        let done_with_task_2 = done_with_task_2.clone();

        async move {
            let mut rng = rand::thread_rng();

            println!("connecting to {}", peer);
            process(peer, core::time::Duration::from_secs(1), "1").await;

            // task #2
            tokio::select! {
                done = async { 
                    if *done_with_task_2.borrow() {
                        future::ready(()).await
                    } else {
                        future::pending().await
                    }
                } => {}
                process = process(peer, core::time::Duration::from_secs(rng.gen_range(5..10)), "2") => {
                    notify.notify_waiters();
                    done_with_task_2.replace(true);
                }
                cancel = notify.notified() => {}
            }

            process(peer, core::time::Duration::from_secs(1), "3").await;

            process(peer, core::time::Duration::from_secs(20), "4").await;
        }
    }
    );

    let create_new_conn_fut = async move {
        for peer in ["peer D", "peer E"] {
            tx.send(peer).await.unwrap();
        }
        // a new peer after 15 seconds
        tokio::time::sleep(core::time::Duration::from_secs(15)).await;
        tx.send("peer F").await.unwrap()
    };

    // awaits all futures in parallell
    futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}

输出:

connecting to peer A
connecting to peer B
connecting to peer C
connecting to peer D
connecting to peer E
task #1 done for peer A
task #1 done for peer B
task #1 done for peer C
task #1 done for peer D
task #1 done for peer E
task #2 done for peer B
task #3 done for peer B
task #3 done for peer E
task #3 done for peer D
task #3 done for peer C
task #3 done for peer A
connecting to peer F
task #1 done for peer F
task #3 done for peer F
task #4 done for peer B
task #4 done for peer E
task #4 done for peer D
task #4 done for peer C
task #4 done for peer A
task #4 done for peer F

然而,代码变得有点冗长。有没有更好的方法来编写它?
人们可能会使用futures::future::Either或使用shared未来。

相关问题