我有一个需要并行运行的繁重任务列表。我喜欢java Stream API,不想直接接触Executors
,因此我编写了以下代码:
Set<Runnable> tasks = Set.of(heavyTask(),…);
tasks.parallelStream().forEach(Runnable::run);
我的并发测试经常(并不总是!)在“繁重任务”执行期间失败。好吧,这可能是一个竞争条件。我直接使用Executors
重写了代码:
try {
Set<Callable<Object>> tasks = Set.of(heavyTask(),…);
Executors.newFixedThreadPool(4)
.invokeAll(tasks).forEach(future->{
try {
future.get();
} catch (InterruptedException | ExecutionException ignore) {
}
});
} catch (InterruptedException ignore) {
}
任务繁重的问题没有了。我真的很困惑。我以为parallelStream()
在引擎盖下使用Executors
,几乎是一样的。.parallelStream()
和ExecutorService
有什么区别吗?或者forEach
在第一个代码示例中不是正确的终止操作?
2条答案
按热度按时间lfapxunr1#
parallelStream
使用的fork/join common pool是一个执行器,所以你是对的,它几乎是一样的。fork/join池用于各种各样的事情,所以可能有一些其他的、不相关的任务在干扰。通过自己声明Executor,你保证了4个专用线程。
forEach
是第一个示例的精细终止操作。要避免的操作是forEachOrdered
,它会破坏并行度。unftdfkk2#
ExecutorService
与parallelStream()
的比较乍一看,它们是可互换的方法,因为
parallelStream()
使用ForkJoinPool
,而ForkJoinPool
又使用Executors
。如果你想的话,语法很简单。但这并不总是正确的。ForkJoinPool
(因此parallelStream()
),因为Java 9返回Executor
,其中ClassLoader
与您可能从中派生的主ClassLoader
不同。这可能会导致奇怪的问题:ForkJoinPool
中的ClassLoader
没有加载库,但我得到了ClassNotFoundException
,可能的解决方案之一是:但对我来说-它看起来并不那么好,因此,对于我的特殊情况,我决定使用纯
Executors
,没有任何副作用。你可以在这个问题中找到更多的解决方案。我希望这能帮助一些人。