如何在Rust中使用组合符(而不是for_each)并且不使用while循环来运行流完成?

px9o7tmv  于 2023-01-09  发布在  其他
关注(0)|答案(2)|浏览(106)

我有一个使用组合器的流,我需要运行它直到完成。我可以使用while循环或for_each组合器。它们都可以工作,但我认为一定有更好的方法。
Sink看起来像我正在寻找的,尤其是sink::drain(),但我还不知道如何使用它。
使用while循环

use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
    let mut stream = Box::pin(
        futures::stream::iter(0..20)
            .map(foo)
            .map_ok(|x| x * 10)
            .and_then(bar)
            .filter(|x| futures::future::ready(x.is_ok())),
    );

    while let Some(_) = stream.next().await {
        // Nothing to do here. I just need to run stream.
    }
}

fn foo(x: i32) -> Result<i32, String> {
    if x != 10 {
        Ok(x)
    } else {
        Err("eeer".to_string())
    }
}

async fn bar(x: i32) -> Result<(), String> {
    async {
        if x == 13 {
            Err("errr".to_string())
        } else {
            Ok(())
        }
    }
    .await
}

使用for_each

use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(foo)
        .map_ok(|x| x * 10)
        .and_then(bar)
        .filter(|x| futures::future::ready(x.is_ok()))
        .for_each(|_| futures::future::ready(())) // Nothing to do here, just to run stream
        .await;
}

fn foo(x: i32) -> Result<i32, String> {
    if x != 10 {
        Ok(x)
    } else {
        Err("eeer".to_string())
    }
}

async fn bar(x: i32) -> Result<(), String> {
    async {
        if x == 13 {
            Err("errr".to_string())
        } else {
            Ok(())
        }
    }
    .await
}

我希望有如下的代码,不需要使用drain组合子,只需要使用一些组合子来运行流:

use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(foo)
        .map_ok(|x| x * 10)
        .and_then(bar)
        .filter(|x| futures::future::ready(x.is_ok()))
        .forward(futures::sink::drain())
        .await;
}

fn foo(x: i32) -> Result<i32, String> {
    if x != 10 {
        Ok(x)
    } else {
        Err("eeer".to_string())
    }
}

async fn bar(x: i32) -> Result<(), String> {
    async {
        if x == 13 {
            Err("errr".to_string())
        } else {
            Ok(())
        }
    }
    .await
}

这不起作用,可能是因为drain在Error类型上设置了一些界限:

error[E0271]: type mismatch resolving `<futures::sink::Drain<()> as futures::Sink<()>>::Error == std::string::String`
  --> src/main.rs:11:10
   |
11 |         .forward(futures::sink::drain())
   |          ^^^^^^^ expected enum `std::convert::Infallible`, found struct `std::string::String`

error[E0271]: type mismatch resolving `<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]> as futures::Stream>::Item == std::result::Result<(), std::convert::Infallible>`
  --> src/main.rs:6:5
   |
6  | /     futures::stream::iter(0..20)
7  | |         .map(foo)
8  | |         .map_ok(|x| x * 10)
9  | |         .and_then(bar)
10 | |         .filter(|x| futures::future::ready(x.is_ok()))
11 | |         .forward(futures::sink::drain())
12 | |         .await;
   | |______________^ expected struct `std::string::String`, found enum `std::convert::Infallible`
   |
   = note: expected enum `std::result::Result<_, std::string::String>`
              found enum `std::result::Result<_, std::convert::Infallible>`
   = note: required because of the requirements on the impl of `futures::Future` for `futures_util::stream::stream::forward::Forward<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]>, futures::sink::Drain<()>, ()>`
lf5gs5x2

lf5gs5x21#

Sink特征是可出错的(不存在TrySink),但是drain()返回ErrorInfallibleDrain
Stream::forward()要求流是可出错的(实际上是TryStream),并且与给定接收器具有相同的错误类型。您的代码失败是因为您的错误类型是String,而这不能被 * 排出 *。
由于您要过滤is_ok结果,因此解决方案是展开并重新 Package 值:

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(foo)
        .map_ok(|x| x * 10)
        .and_then(bar)
        .filter(|x| futures::future::ready(x.is_ok()))
        .map(|x| Ok(x.unwrap())) // <---- rewrap!
        .forward(futures::sink::drain())
        .await.unwrap();
}

我觉得应该有一种更简单的方法来构建Result<_, Infallible>,但我不知道该怎么做,你可以写map_err(|_| panic!()),但那也好不到哪里去。

unftdfkk

unftdfkk2#

您可以使用collect::<()>()来运行流直到完成。

use futures::StreamExt;

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(|i| async move {
            // Do something here
            println!("{}", i);
        })
        .buffer_unordered(4)
        .collect::<()>()
        .await;
}

尽管collect::<()>()有collect这个词,但它并不收集任何东西或构建任何数据结构,它只是在流上循环并执行到完成。
需要注意的是,要使用collect::<()>(),流的Item必须是()。换句话说,在使用这个方法之前,必须处理结果和错误。我认为这是非常有意义的。

相关问题