rust 如何使par_bridge()为BufReader工作?

7bsow1i6  于 2023-05-17  发布在  其他
关注(0)|答案(1)|浏览(104)

我想使用rayon的par_bridge()来并行化一个Iterator,但我不能,因为错误“the method par_bridge exists for struct MyReader,but its trait bounds were not satisfied the following trait bounds were not satisfied:MyReader: Send ..."。
下面是工作的串行版本:

use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;

struct MyReader {
    buf: Box<dyn BufRead>,
}

impl MyReader {
    fn new(filename: &str) -> Result<MyReader, Box<dyn std::error::Error>> {
        let path = Path::new(filename);
        let file = File::open(path)?;
        let buf = Box::new(BufReader::new(file));
        Ok(MyReader { buf })
    }
}

impl Iterator for MyReader {
    type Item = String;
    fn next(&mut self) -> Option<Self::Item> {
        let mut line = String::new();
        if self.buf.read_line(&mut line).unwrap() > 0 {
            return Some(line.trim_end().to_owned());
        } else {
            return None;
        }
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>>{
    let filename = "test/lines.txt";
    let my_reader = MyReader::new(filename)?;
    // serial version
    my_reader.for_each(|x| println!("{}", x));
    Ok(())
}

结果:

$ cat test/lines.txt
01
02
03

$ cargo run
01
02
03

下面是失败的一个:

... // same as the serial
fn main() -> Result<(), Box<dyn std::error::Error>>{
    let filename = "test/lines.txt";
    let my_reader = MyReader::new(filename)?;
    // parallel version
    my_reader.par_bridge().for_each(|x| println!("{}", x));
    Ok(())
}

PS.我知道上面的例子很笨拙,但实际上我处理的数据有多行作为记录。这就是为什么我必须实现自己的迭代器,而不是使用BufReadlines()来创建一个。
目标是并行读取一个非常大的文件,并独立处理每个记录。
PS3.我正在尝试人造丝的ParallelIterator只是为了简单起见。如果有人能告诉我其他的方法,特别是那些只使用std库的方法,我也会很感激。
谢谢。

5t7ly7z5

5t7ly7z51#

问题是dyn BufRead不是Send。Trait对象必须被显式地标记以实现trait。

struct MyReader {
    buf: Box<dyn BufRead + Send>,
}

这将起作用,但并不理想。它既不符合性能也不符合习惯用法。Trait对象必须在运行时确定它们是什么类型,而泛型在编译时这样做,这意味着它们有更多的优化机会。你应该在这里使用泛型。

struct MyReader<R> {
    buf: R,
}

然后修改你的impl块。

/// A generic constructor that handles any implementor of `Read`.
// You may also want a constructor that takes an implementor of `BufRead`.
impl<R: Read> MyReader<BufReader<R>> {
    fn new(read: R) -> Self {
        Self { buf: BufReader::new(read) }
    }
}

impl MyReader<BufReader<File>> {
    fn from_file(filename: &str) -> Result<Self, Box<dyn std::error::Error>> {
        let path = Path::new(filename);
        let file = File::open(path)?;
        Ok(Self::new(file))
    }
}
// This is how you would make another kind of `MyReader`.
// Note that you can't store a `StdinLock` since that does not implement `Send`.
use std::io::{stdin, Stdin};
impl MyReader<BufReader<Stdin>> {
    fn from_stdin() -> Self {
        Self::new(stdin())
    }
}

impl<R: BufRead> Iterator for MyReader<R> { ... }

因为BufReader<File>Send,所以MyReader<BufReader<File>>也是。
然后,我建议将所有的处理逻辑放入一个泛型函数中。既然你手头已经有了一个结构体,这可以是一个方法,但是要做最适合组织的事情。

impl<R: BufRead + Send> MyReader<R> {
    fn process_parallel(self) {
        self.par_bridge().for_each(|x| println!("{}", x));
    }
}

然后,您可以在想要创建的每种MyReader上调用它。

if use_stdin {
    MyReader::from_stdin().process_parallel();
} else {
    MyReader::from_file(filename)?.process_parallel();
}

请注意,如果需要,您仍然可以创建MyReader<Box<dyn BufRead + Send>>,因此使用泛型纯粹是灵活性的升级。
(playground)

相关问题