rust 从闭包调用异步函数

unftdfkk  于 2022-12-29  发布在  其他
关注(0)|答案(1)|浏览(258)

我想在一个迭代器中使用的闭包中await一个async函数。需要闭包的函数在一个结构实现中被调用。我不知道怎么做。
这段代码模拟了我正在尝试做的事情:

struct MyType {}

impl MyType {
    async fn foo(&self) {
        println!("foo");

        (0..2).for_each(|v| {
            self.bar(v).await;
        });
    }

    async fn bar(&self, v: usize) {
        println!("bar: {}", v);
    }
}

#[tokio::main]
async fn main() {
    let mt = MyType {};
    mt.foo().await;
}

显然,这是行不通的,因为闭包不是async,给我:

error[E0728]: `await` is only allowed inside `async` functions and blocks
 --> src/main.rs:8:13
  |
7 |         (0..2).for_each(|v| {
  |                         --- this is not `async`
8 |             self.bar(v).await;
  |             ^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

在寻找如何从非async函数调用async函数的答案之后,我不得不这样做:

tokio::spawn(async move {
    self.bar(v).await;
});

但现在我却遇到了人生的问题:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
 --> src/main.rs:4:18
  |
4 |     async fn foo(&self) {
  |                  ^^^^^
  |                  |
  |                  this data with an anonymous lifetime `'_`...
  |                  ...is captured here...
...
8 |             tokio::spawn(async move {
  |             ------------ ...and is required to live as long as `'static` here

这并不让我感到惊讶,因为据我所知,Rust编译器无法知道线程的寿命,因此,由tokio::spawn派生的线程可能比MyType类型的线程寿命长。
我想到的第一个修正是把bar变成一个关联函数,复制闭包中需要的所有东西,并把它作为一个值传递给bar,然后用MyType::bar(copies_from_self)调用它,但这变得越来越糟糕,因为有很多复制。
我尝试使用futures::executor::block_on,它可以处理简单的任务,比如本文中的任务:

(0..2).for_each(|v| {
    futures::executor::block_on(self.bar(v));
});

但是当我把它放到我的真实的例子中时,我使用了一个同样使用tokio的第三方库1,事情就不再起作用了。我意识到#[tokio::main]是一个宏,它最终将所有内容都 Package 在block_on中,因此通过这样做,将有嵌套的block_on。这可能是bar中调用的async方法之一停止工作而没有任何错误或日志记录的原因(没有block_on也能工作,所以代码应该没有任何问题)我联系了作者,他们说我可以使用for_each(|i| async move { ... }),这让我更加困惑。

(0..2).for_each(|v| async move {
    self.bar(v).await;
});

将导致编译错误

expected `()`, found opaque type`

我认为这是有意义的,因为我现在返回的是future,而不是()。我天真的方法是尝试使用以下代码等待future:

(0..2).for_each(|v| {
    async move {
        self.bar(v).await;
    }
    .await
});

但这又把我带回了原点,导致了下面的编译错误,我也认为这是有道理的,因为我现在又回到了在闭包sync中使用await

only allowed inside `async` functions and blocks` since the

这一发现也使我很难利用这里和here找到的答案。
在所有这些cargo cult编程之后,问题基本上是,这是可能的吗?如果是,我如何在迭代器中从闭包调用我的async函数(最好不产生线程,以避免生存期问题)?如果这是不可能的,那么它的惯用实现应该是什么样的?
1这是使用的库/方法

oymdgrw7

oymdgrw71#

Iterator::for_each需要一个同步闭包,因此不能在其中使用.await(至少不能直接使用),也不能从其中返回future。
一种解决方案是只使用for循环而不是.for_each

for v in 0..2 {
    self.bar(v).await;
}

更一般的方法是使用streams而不是迭代器,因为它们是异步的等价方法(流上的等价方法通常也是异步的),这不仅适用于for_each,也适用于大多数其他迭代器方法:

use futures::prelude::*;

futures::stream::iter(0..2)
    .for_each(|c| async move {
        self.bar(v).await;
    })
    .await;

相关问题