rust 如何将未来转化为溪流?

x6h2sr28  于 2023-01-21  发布在  其他
关注(0)|答案(2)|浏览(149)

我正在尝试使用async_std从网络接收UDP数据报。
有一个UdpSocket实现了async recv_from,这个方法返回一个future,但是我需要一个async_std::stream::Stream,它给出了一个UDP数据报流,因为它是一个更好的抽象。
我发现tokio::net::UdpFramed正好满足了我的需求,但它在当前版本的时雄中不可用。
一般来说,问题是如何将Future s从给定的异步函数转换为Stream

2wnc66cl

2wnc66cl1#

对于单个项目,请使用FutureExt::into_stream

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    inner().into_stream()
}

async fn inner() -> i32 {
    42
}

对于闭包生成的来自多个future的流,使用stream::unfold

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    stream::unfold((), |()| async { Some((inner().await, ())) })
}

async fn inner() -> i32 {
    42
}

在您的情况下,可以使用stream::unfold

use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1

fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
    stream::unfold(s, |s| {
        async {
            let data = read_one(&s).await;
            Some((data, s))
        }
    })
}

async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
    let mut data = vec![0; 1024];
    let (len, _) = s.recv_from(&mut data).await?;
    data.truncate(len);
    Ok(data)
}

#[async_std::main]
async fn main() -> io::Result<()> {
    let s = UdpSocket::bind("0.0.0.0:9876").await?;

    read_many(s)
        .for_each(|d| {
            async {
                match d {
                    Ok(d) => match std::str::from_utf8(&d) {
                        Ok(s) => println!("{}", s),
                        Err(_) => println!("{:x?}", d),
                    },
                    Err(e) => eprintln!("Error: {}", e),
                }
            }
        })
        .await;

    Ok(())
}
6tr1vspr

6tr1vspr2#

一般来说,问题是如何将Futures从给定的异步函数转换为Stream
FutureExt::into_stream,但是不要被这个名字欺骗了;它不太适合你情况。
有一个UdpSocket实现了异步recv_from,这个方法返回一个future,但是我需要一个async_std::stream::Stream,它给出了一个UDP数据报流,因为它是一个更好的抽象。
在这里,它 * 不 * 一定是一个更好的抽象。
具体来说,async-stdUdpSocket::recv_from返回一个future,其输出类型为(usize, SocketAddr)--接收到的数据的大小和对等地址,如果你使用into_stream将其转换为流,它只会给予你这个,而不是接收到的数据。
我发现tokio::net::UdpFramed正是我所需要的,但它在当前版本的时雄中不可用。
它已经被移到tokio-util的板条箱中了。不幸的是,你也不能(轻松地)使用它。它需要一个tokio::net::UdpSocket,这和async_std::net::UdpSocket不一样。
当然,您可以使用future实用程序函数(如futures::stream::poll_fnfutures::stream::unfold)为UdpSocket::recv_from提供futures::stream::Stream外观,但是接下来您将如何处理它呢?如果您最终调用StreamExt::next以从其中轮询一个值,则可以直接使用recv_from
只有当您正在使用的某些API需要Stream输入时,才需要达到Stream,例如rusoto

相关问题