假设我们有n个独立的阻塞IO任务,例如对另一个服务器进行rest-call的任务。然后我们需要合并所有的响应。每个任务可以在10秒内处理。
1.我们可以按顺序处理它,最后花了~n*10秒:
Task1Ans task1 = service1.doSomething();
Task2Ans task2 = service2.doSomething()
...
return result;
1.另一种策略是使用CompletableFuture以并行方式处理它,并在所有任务上花费约10秒:
CompletableFuture<Task1Ans> task1Cs = CompletableFuture.supplyAsync(() -> service1.doSomething(), bestExecutor);
CompletableFuture<Task2Ans> task2Cs = CompletableFuture.supplyAsync(() -> service2.doSomething(), bestExecutor);
return CompletableFuture.allOf(task1Cs, task2Cs)
.thenApply(nothing -> {
...
// combine task1, task2 into result object
return result;
}).join();
第二种方法有好处,但我不明白哪种类型的线程池最适合这种任务:
ExecutorService bestExecutor = Executors.newFixedThreadPool(30) /// or Executors.newCachedThreadPool() or Executors.newWorkStealingPool()
我的问题是哪个ExecutorService最适合处理n并行阻塞IO任务。
4条答案
按热度按时间bfhwhh0e1#
在完全受CPU限制的任务中,使用比CPU内核更多的线程不会获得额外的性能。因此,在这种情况下,8核/ 8线程CPU只需要8个线程就可以最大限度地提高性能,而使用更多的线程会降低性能。IO任务通常通过使用比CPU内核更多的线程来获得性能。因为CPU时间可用于在等待IO时做其他事情。但是即使每个线程的CPU开销很低,由于每个线程占用内存,并且引起缓存/上下文切换,因此扩展也有限制。
假设你的任务是IO受限的,并且你没有提供任何其他约束,你可能应该为每个IO任务运行不同的线程。你可以通过使用固定或缓存线程池来实现这一点。
如果IO任务的数量非常大(数千+),则应该限制线程池的最大大小,因为可能会有太多线程。
如果你的任务是CPU受限的,你应该再次限制线程池的大小,甚至更小。内核的数量可以通过使用动态获取:
此外,就像CPU有扩展限制一样,IO设备通常也有扩展限制。您不应该超过该限制,但如果没有测量,很难说限制在哪里。
niwlg2el2#
Project Loom
您的情况适合使用为Java的未来版本提出的新功能:virtual threads和structured concurrency。它们是Project Loom的一部分。
今天的Java线程是一对一Map到主机操作系统线程的。当Java代码阻塞时,主机线程也会阻塞。主机操作系统线程处于空闲状态,等待执行恢复。主机操作系统线程是重量级的,在CPU和内存方面都是昂贵的。因此这种空闲不是最佳的。
相反,Project Loom中的虚拟线程被Map到主机OS线程上的多对一。当虚拟线程中的代码阻塞时,该任务被“停放”,留出一些执行时间以允许另一个虚拟线程的任务。虚拟线程的这种停放在JVM中进行管理,因此它在CPU和内存中都是高度优化的,非常快,非常有效。因此,在普通硬件上运行的Java应用程序可以同时支持数千甚至数百万个虚拟线程。
ExecutorService
是Loom中的AutoCloseable
。因此,我们可以使用try-with-resources将整批任务包含在try ( ExecutorService es = Executors.newVirtualThreadPerTaskExecutor() ) { … submit tasks … }
中。一旦完成,控制流将从try-with-resources块中退出,并且您知道您的任务已经完成。访问为您提交的每个任务返回的Future
对象。不需要CompletableFuture
。Loom特性现在正在Java 19中预览和孵化。
有关更多信息,请参阅几篇文章,演示文稿和对Project Loom团队成员的采访。这些成员包括罗恩Pressler和Alan Bateman。
4c8rllxm3#
如果我正确理解了你的问题,对于上述行为,无论选择
executorService
,如何调用你的executorService
更重要。例如:
现在这里,
invokeAll(..)
将阻塞,直到所有提供的任务完成。所以我觉得选择任何ExecutorService并调用invokeAll(..)
将适合您的要求。另外,请看看这个SE Question,它讨论了新的Java 8引入的ExecutorCompletionService和
invokeAll
。velaa5lx4#
我找到了这类任务的最佳解决方案,我所需要的就是查看Executors.newCachedThreadPool()或Executors.newFixedThreadPool(30)的实现
我的决定是直接示例化ThreadPoolExecutor,并设置线程池可以创建的线程的上限,并设置超时时间,让未使用的线程可以终止