java parallelStream()和ExecutorService之间有什么区别吗?

uurity8g  于 2022-12-10  发布在  Java
关注(0)|答案(2)|浏览(318)

我有一个需要并行运行的繁重任务列表。我喜欢java Stream API,不想直接接触Executors,因此我编写了以下代码:

Set<Runnable> tasks = Set.of(heavyTask(),…);
tasks.parallelStream().forEach(Runnable::run);

我的并发测试经常(并不总是!)在“繁重任务”执行期间失败。好吧,这可能是一个竞争条件。我直接使用Executors重写了代码:

try {
    Set<Callable<Object>> tasks = Set.of(heavyTask(),…);
    Executors.newFixedThreadPool(4)
        .invokeAll(tasks).forEach(future->{
            try {
                future.get();
            } catch (InterruptedException | ExecutionException ignore) {
        }
    });
} catch (InterruptedException ignore) {
}

任务繁重的问题没有了。我真的很困惑。我以为parallelStream()在引擎盖下使用Executors,几乎是一样的。.parallelStream()ExecutorService有什么区别吗?或者forEach在第一个代码示例中不是正确的终止操作?

lfapxunr

lfapxunr1#

parallelStream使用的fork/join common pool是一个执行器,所以你是对的,它几乎是一样的。
fork/join池用于各种各样的事情,所以可能有一些其他的、不相关的任务在干扰。通过自己声明Executor,你保证了4个专用线程。
forEach是第一个示例的精细终止操作。要避免的操作是forEachOrdered,它会破坏并行度。

unftdfkk

unftdfkk2#

ExecutorServiceparallelStream()的比较

乍一看,它们是可互换的方法,因为parallelStream()使用ForkJoinPool,而ForkJoinPool又使用Executors。如果你想的话,语法很简单。但这并不总是正确的。ForkJoinPool(因此parallelStream()),因为Java 9返回Executor,其中ClassLoader与您可能从中派生的主ClassLoader不同。这可能会导致奇怪的问题:ForkJoinPool中的ClassLoader没有加载库,但我得到了ClassNotFoundException,可能的解决方案之一是:

final ClassLoader cl = Thread.currentThread().getContextClassLoader();
tasks.parallelStream().forEach(task -> {
    Thread.currentThread().setContextClassLoader(cl);
    task.get();
});

但对我来说-它看起来并不那么好,因此,对于我的特殊情况,我决定使用纯Executors,没有任何副作用。你可以在这个问题中找到更多的解决方案。我希望这能帮助一些人。

相关问题