我有一个使用组合器的流,我需要运行它直到完成。我可以使用while
循环或for_each
组合器。它们都可以工作,但我认为一定有更好的方法。Sink
看起来像我正在寻找的,尤其是sink::drain()
,但我还不知道如何使用它。
使用while循环
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0
#[tokio::main]
async fn main() {
let mut stream = Box::pin(
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok())),
);
while let Some(_) = stream.next().await {
// Nothing to do here. I just need to run stream.
}
}
fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}
async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
使用for_each
:
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0
#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.for_each(|_| futures::future::ready(())) // Nothing to do here, just to run stream
.await;
}
fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}
async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
我希望有如下的代码,不需要使用drain
组合子,只需要使用一些组合子来运行流:
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0
#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.forward(futures::sink::drain())
.await;
}
fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}
async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
这不起作用,可能是因为drain在Error
类型上设置了一些界限:
error[E0271]: type mismatch resolving `<futures::sink::Drain<()> as futures::Sink<()>>::Error == std::string::String`
--> src/main.rs:11:10
|
11 | .forward(futures::sink::drain())
| ^^^^^^^ expected enum `std::convert::Infallible`, found struct `std::string::String`
error[E0271]: type mismatch resolving `<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]> as futures::Stream>::Item == std::result::Result<(), std::convert::Infallible>`
--> src/main.rs:6:5
|
6 | / futures::stream::iter(0..20)
7 | | .map(foo)
8 | | .map_ok(|x| x * 10)
9 | | .and_then(bar)
10 | | .filter(|x| futures::future::ready(x.is_ok()))
11 | | .forward(futures::sink::drain())
12 | | .await;
| |______________^ expected struct `std::string::String`, found enum `std::convert::Infallible`
|
= note: expected enum `std::result::Result<_, std::string::String>`
found enum `std::result::Result<_, std::convert::Infallible>`
= note: required because of the requirements on the impl of `futures::Future` for `futures_util::stream::stream::forward::Forward<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]>, futures::sink::Drain<()>, ()>`
2条答案
按热度按时间lf5gs5x21#
Sink
特征是可出错的(不存在TrySink
),但是drain()
返回Error
是Infallible
的Drain
。而
Stream::forward()
要求流是可出错的(实际上是TryStream
),并且与给定接收器具有相同的错误类型。您的代码失败是因为您的错误类型是String
,而这不能被 * 排出 *。由于您要过滤
is_ok
结果,因此解决方案是展开并重新 Package 值:我觉得应该有一种更简单的方法来构建
Result<_, Infallible>
,但我不知道该怎么做,你可以写map_err(|_| panic!())
,但那也好不到哪里去。unftdfkk2#
您可以使用
collect::<()>()
来运行流直到完成。尽管
collect::<()>()
有collect这个词,但它并不收集任何东西或构建任何数据结构,它只是在流上循环并执行到完成。需要注意的是,要使用
collect::<()>()
,流的Item
必须是()
。换句话说,在使用这个方法之前,必须处理结果和错误。我认为这是非常有意义的。