Rust线程池收集结果

ajsxfq5m  于 2023-08-05  发布在  其他
关注(0)|答案(1)|浏览(100)

我只是拿起Rust,因为我必须做一些需要速度的文件处理。
我写了代码,但现在是时候让它并行运行了。我在Rust中使用线程池时遇到了问题。我在人造丝中找到了一个实现,并决定尝试一下。然而,并行代码运行速度比串行代码慢。
我创建了一个例子,它更简单:

use std::time::Instant;
use std::{thread, time};

fn do_stuff(i: u64) -> u64 {
    let seconds = time::Duration::from_secs(i);
    thread::sleep(seconds);
    println!("{:?}", i);
    return i * i;
}

fn parallel() {
    let mut x = Vec::new();

    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();

    for i in 1..10 {
        pool.install(|| {
            let result = do_stuff(i);
            x.push(result);
        });
    }

    println!("{:?}", x);
}

fn serial() {
    let mut x = Vec::new();

    for i in 1..10 {
        let result = do_stuff(i);
        x.push(result);
    }

    println!("{:?}", x);
}

fn main() {
    let start = Instant::now();
    serial();
    let duration = start.elapsed();
    println!("Time elapsed (serial): {:.2}s", duration.as_secs_f64());

    let start = Instant::now();
    parallel();
    let duration = start.elapsed();
    println!("Time elapsed (parallel): {:.2}s", duration.as_secs_f64());
}

字符串
结果是:

1
2
3
4
5
6
7
8
9
[1, 4, 9, 16, 25, 36, 49, 64, 81]
Time elapsed (serial): 45.00s
1
2
3
4
5
6
7
8
9
[1, 4, 9, 16, 25, 36, 49, 64, 81]
Time elapsed (parallel): 45.01s


在这里,时间是一样的,但是当运行我的实际代码时,并行版本要慢得多(即,30 s vs 50 s-所以我不认为这是线程创建开销的问题)。
基本上,我想实现的是:

  • 我有一个由许多元素组成的可迭代对象(这里是9个,但没关系)
  • 我想遍历它,把每个元素传递给一个耗时的函数来计算东西
  • 这个函数返回一个结果,我想把它附加到启动线程之前定义的向量中
  • 我想在线程池中执行(我不想为每个元素生成单独的线程)

很简单我来自Python,在Python中,像ProcessPoolExecutor这样将结果返回到单个列表非常简单。然而,在这里,我没有看到添加线程池的改进。
我用错了吗?有没有办法在Rust中做到这一点?
我不太明白为什么我能在parallel函数中把.push转换成x向量。我是说,它看起来不安全。

i7uq4tfw

i7uq4tfw1#

你让事情变得更复杂了。
rayon已经处理好了一切,不需要创建一个花哨的线程池或类似的东西。简单地使用并行迭代器。它将在全局rayon线程池中执行,该线程池的大小根据系统的内核数量自动调整。

fn parallel() {
    let x: Vec<u64> = (1..10).into_par_iter().map(do_stuff).collect();
    println!("{:?}", x);
}

个字符
如果你必须因为某种原因使用自定义线程池,不要对每个元素都使用install()install()不并行运行-它只在函数完成后返回。
相反,输入线程池上下文install,然后在其中输入scope + spawn
我试着跟你的榜样保持距离;注意在Vec周围添加的Mutex,因为如果你实际上并行运行,你不能简单地同时从所有线程访问Vec。(我不是说你不应该这样做;没有unsafe是不可能的。编译器不会让你这样做)。

fn parallel() {
    let x = Mutex::new(Vec::new());

    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();

    pool.install(|| {
        rayon::scope(|s| {
            for i in 1..10 {
                let x = &x;
                s.spawn(move |_| {
                    let result = do_stuff(i);
                    x.lock().unwrap().push(result);
                });
            }
        });
    });

    println!("{:?}", x.lock().unwrap());
}
[1, 4, 9, 16, 25, 36, 49, 81, 64]
Time elapsed (parallel): 9.05s

的字符串
当然,通过并行迭代器也可以写得更简单:

fn parallel() {
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();

    let x: Vec<u64> = pool.install(|| (1..10).into_par_iter().map(do_stuff).collect());

    println!("{:?}", x);
}
[1, 4, 9, 16, 25, 36, 49, 64, 81]
Time elapsed (parallel): 10.03s

的字符串

相关问题