异步读取的Rust通用函数尝试

3hvapo4f  于 2022-12-19  发布在  其他
关注(0)|答案(1)|浏览(155)

我的目标是将下面的read_stream_***()函数简化为一个泛型函数,可以传递给不同的流。

use async_std::net::TcpStream;
use async_std::{ task };
use async_std::io::{ stdin, BufReader, Stdin };
use async_std:: { prelude::* };
use futures::{select, FutureExt, AsyncRead };

pub async fn read_stream_stdin(streem:Stdin) -> Result<(), std::io::Error> 
{
  let mut lines_from_stream = BufReader::new(streem).lines().fuse();
  loop {
    select! {
      line = lines_from_stream.next().fuse() => match line {
        Some(line) => {
           println!("{:?}",line?);
        }
        None => break,
      }
    }
  }
  Ok(())
}

pub async fn read_stream_tcp(streem:TcpStream) -> Result<(), std::io::Error> 
{
  let mut lines_from_stream = BufReader::new(streem).lines().fuse();
  loop {
    select! {
      line = lines_from_stream.next().fuse() => match line {
        Some(line) => {
           println!("{:?}",line?);
        }
        None => break,
      }
    }
  }
  Ok(())
}

pub async fn connect_tcp_server(host_port:&str) -> Result<(), std::io::Error>
{
  let streem = TcpStream::connect(host_port).await;
  let _result = task::block_on(read_stream_tcp(streem?));

  Ok(())
}

fn main() -> Result<(), std::io::Error> {

  task::spawn( connect_tcp_server("127.0.0.1:8081") );
  task::block_on(read_stream_stdin(stdin()))

}

一般尝试:

pub async fn read_stream<T>(streem:T) -> Result<(), std::io::Error>
{
  let mut lines_from_stream = BufReader::new(streem).lines().fuse();
  loop {
    select! {
      line = lines_from_stream.next().fuse() => match line {
        Some(line) => {
           println!("{:?}",line?);
        }
        None => break,
      }
    }
  }
  Ok(())
}

货物

[package]
name = "gen_func"
version = "0.1.0"
edition = "2021"

[dependencies]
async-std = "1.9.0"
futures = "0.3.21"

我尝试〈T:async_std::io::Read〉但未实现fuse()和lines()。在async_std::io中未找到AsyncRead。我在futures crate中找到AsyncRead,但未实现fuse()和lines()。我未设置读取模式。我是Rust新手,正在尝试构建自己的源库以解决未来的编程任务。

4c8rllxm

4c8rllxm1#

首先,正如kmdreko所指出的,函数的逻辑可以大大简化(至少基于给定的信息):

pub async fn read_stream_tcp(stream: TcpStream) -> Result<(), std::io::Error> {
    let mut lines = BufReader::new(stream).lines();
    while let Some(line) = lines.next().await {
        println!("{line:?}");
    }
}
Ok(())

然后,为了弄清楚如何使这个泛型化,你可以让编译器告诉你它需要什么:

pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
{
    let mut lines = BufReader::new(stream).lines();
    while let Some(line) = lines.next().await {
        println!("{line:?}");
    }
    Ok(())
}

注意where子句的缺失或T上的其他约束,编译器现在会抱怨:

error[E0277]: the trait bound `T: async_std::io::Read` is not satisfied  --> src/main.rs:15:36   |
15 |     let mut lines = BufReader::new(stream).lines();
   |                     -------------- ^^^^^^ the trait `async_std::io::Read` is not implemented for `T`
   |                     |
   |                     required by a bound introduced by this call
   |
note: required by a bound in `async_std::io::BufReader::<R>::new`
  --> /home/lucas/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/io/buf_reader.rs:55:9
   |
55 | impl<R: io::Read> BufReader<R> {
   |         ^^^^^^^^ required by this bound in `async_std::io::BufReader::<R>::new`
help: consider restricting type parameter `T`
   |
13 | pub async fn read_stream<T: async_std::io::Read>(stream: T) -> Result<(), std::io::Error>
   |                           +++++++++++++++++++++

应用编译器的建议(上面的建议将导致后续错误)将生成T: async_std::io::Read + std::marker::Unpin的完整where子句:

pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
where
    T: Read + std::marker::Unpin,
{
    let mut lines = BufReader::new(stream).lines();
    while let Some(line) = lines.next().await {
        println!("{line:?}");
    }
    Ok(())
}

async fn try_it() {
    // These will now compile just fine
    read_stream(async_std::io::stdin()).await.unwrap();
    read_stream(TcpStream::connect("127.0.0.1:8080").await.unwrap()).await.unwrap();
}

我尝试〈T:async_std::io::Read〉但未实现fuse()和lines()
这表明您试图同时替换BufReader::new(stream)。您可以这样做,但您需要告诉编译器您需要实现lines()方法的内容。要么将参数设置为固定类型BufReader<T>,要么将where子句设置为泛型类型T: async_std::io::BufRead + std::marker::Unpin

相关问题