我想使用rayon的par_bridge()
来并行化一个Iterator,但我不能,因为错误“the method par_bridge
exists for struct MyReader
,but its trait bounds were not satisfied the following trait bounds were not satisfied:MyReader: Send
..."。
下面是工作的串行版本:
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
struct MyReader {
buf: Box<dyn BufRead>,
}
impl MyReader {
fn new(filename: &str) -> Result<MyReader, Box<dyn std::error::Error>> {
let path = Path::new(filename);
let file = File::open(path)?;
let buf = Box::new(BufReader::new(file));
Ok(MyReader { buf })
}
}
impl Iterator for MyReader {
type Item = String;
fn next(&mut self) -> Option<Self::Item> {
let mut line = String::new();
if self.buf.read_line(&mut line).unwrap() > 0 {
return Some(line.trim_end().to_owned());
} else {
return None;
}
}
}
fn main() -> Result<(), Box<dyn std::error::Error>>{
let filename = "test/lines.txt";
let my_reader = MyReader::new(filename)?;
// serial version
my_reader.for_each(|x| println!("{}", x));
Ok(())
}
结果:
$ cat test/lines.txt
01
02
03
$ cargo run
01
02
03
下面是失败的一个:
... // same as the serial
fn main() -> Result<(), Box<dyn std::error::Error>>{
let filename = "test/lines.txt";
let my_reader = MyReader::new(filename)?;
// parallel version
my_reader.par_bridge().for_each(|x| println!("{}", x));
Ok(())
}
PS.我知道上面的例子很笨拙,但实际上我处理的数据有多行作为记录。这就是为什么我必须实现自己的迭代器,而不是使用BufRead
的lines()
来创建一个。
目标是并行读取一个非常大的文件,并独立处理每个记录。
PS3.我正在尝试人造丝的ParallelIterator
只是为了简单起见。如果有人能告诉我其他的方法,特别是那些只使用std
库的方法,我也会很感激。
谢谢。
1条答案
按热度按时间5t7ly7z51#
问题是
dyn BufRead
不是Send
。Trait对象必须被显式地标记以实现trait。这将起作用,但并不理想。它既不符合性能也不符合习惯用法。Trait对象必须在运行时确定它们是什么类型,而泛型在编译时这样做,这意味着它们有更多的优化机会。你应该在这里使用泛型。
然后修改你的impl块。
因为
BufReader<File>
是Send
,所以MyReader<BufReader<File>>
也是。然后,我建议将所有的处理逻辑放入一个泛型函数中。既然你手头已经有了一个结构体,这可以是一个方法,但是要做最适合组织的事情。
然后,您可以在想要创建的每种
MyReader
上调用它。请注意,如果需要,您仍然可以创建
MyReader<Box<dyn BufRead + Send>>
,因此使用泛型纯粹是灵活性的升级。(playground)