java 哪个ExecutorService最适合阻塞IO任务

zd287kbt  于 2023-04-19  发布在  Java
关注(0)|答案(4)|浏览(145)

假设我们有n个独立的阻塞IO任务,例如对另一个服务器进行rest-call的任务。然后我们需要合并所有的响应。每个任务可以在10秒内处理。
1.我们可以按顺序处理它,最后花了~n*10秒:

Task1Ans task1 = service1.doSomething();
Task2Ans task2 = service2.doSomething()
...
return result;

1.另一种策略是使用CompletableFuture以并行方式处理它,并在所有任务上花费约10秒:

CompletableFuture<Task1Ans> task1Cs = CompletableFuture.supplyAsync(() -> service1.doSomething(), bestExecutor);
CompletableFuture<Task2Ans> task2Cs = CompletableFuture.supplyAsync(() -> service2.doSomething(), bestExecutor);
return CompletableFuture.allOf(task1Cs, task2Cs)
   .thenApply(nothing -> {
       ...
       // combine task1, task2 into result object
       return result;
   }).join();

第二种方法有好处,但我不明白哪种类型的线程池最适合这种任务:

ExecutorService bestExecutor = Executors.newFixedThreadPool(30)   /// or Executors.newCachedThreadPool() or Executors.newWorkStealingPool()

我的问题是哪个ExecutorService最适合处理n并行阻塞IO任务。

bfhwhh0e

bfhwhh0e1#

在完全受CPU限制的任务中,使用比CPU内核更多的线程不会获得额外的性能。因此,在这种情况下,8核/ 8线程CPU只需要8个线程就可以最大限度地提高性能,而使用更多的线程会降低性能。IO任务通常通过使用比CPU内核更多的线程来获得性能。因为CPU时间可用于在等待IO时做其他事情。但是即使每个线程的CPU开销很低,由于每个线程占用内存,并且引起缓存/上下文切换,因此扩展也有限制。
假设你的任务是IO受限的,并且你没有提供任何其他约束,你可能应该为每个IO任务运行不同的线程。你可以通过使用固定或缓存线程池来实现这一点。
如果IO任务的数量非常大(数千+),则应该限制线程池的最大大小,因为可能会有太多线程。
如果你的任务是CPU受限的,你应该再次限制线程池的大小,甚至更小。内核的数量可以通过使用动态获取:

int cores = Runtime.getRuntime().availableProcessors();

此外,就像CPU有扩展限制一样,IO设备通常也有扩展限制。您不应该超过该限制,但如果没有测量,很难说限制在哪里。

niwlg2el

niwlg2el2#

Project Loom

您的情况适合使用为Java的未来版本提出的新功能:virtual threadsstructured concurrency。它们是Project Loom的一部分。
今天的Java线程是一对一Map到主机操作系统线程的。当Java代码阻塞时,主机线程也会阻塞。主机操作系统线程处于空闲状态,等待执行恢复。主机操作系统线程是重量级的,在CPU和内存方面都是昂贵的。因此这种空闲不是最佳的。
相反,Project Loom中的虚拟线程被Map到主机OS线程上的多对一。当虚拟线程中的代码阻塞时,该任务被“停放”,留出一些执行时间以允许另一个虚拟线程的任务。虚拟线程的这种停放在JVM中进行管理,因此它在CPU和内存中都是高度优化的,非常快,非常有效。因此,在普通硬件上运行的Java应用程序可以同时支持数千甚至数百万个虚拟线程。
ExecutorService是Loom中的AutoCloseable。因此,我们可以使用try-with-resources将整批任务包含在try ( ExecutorService es = Executors.newVirtualThreadPerTaskExecutor() ) { … submit tasks … }中。一旦完成,控制流将从try-with-resources块中退出,并且您知道您的任务已经完成。访问为您提交的每个任务返回的Future对象。不需要CompletableFuture
Loom特性现在正在Java 19中预览和孵化。
有关更多信息,请参阅几篇文章,演示文稿和对Project Loom团队成员的采访。这些成员包括罗恩Pressler和Alan Bateman。

4c8rllxm

4c8rllxm3#

如果我正确理解了你的问题,对于上述行为,无论选择executorService,如何调用你的executorService更重要。
例如:

ExecutorService executorService=Executors.newCachedThreadPool();
executorService.invokeAll(..);

现在这里,invokeAll(..)将阻塞,直到所有提供的任务完成。所以我觉得选择任何ExecutorService并调用invokeAll(..)将适合您的要求。
另外,请看看这个SE Question,它讨论了新的Java 8引入的ExecutorCompletionServiceinvokeAll

velaa5lx

velaa5lx4#

我找到了这类任务的最佳解决方案,我所需要的就是查看Executors.newCachedThreadPool()或Executors.newFixedThreadPool(30)的实现

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

我的决定是直接示例化ThreadPoolExecutor,并设置线程池可以创建的线程的上限,并设置超时时间,让未使用的线程可以终止

int nThread = 90;
long timeoutSec = 120;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("Executor-Worker-%d")
                .setDaemon(true)
                .build();
Executor delegate = new ThreadPoolExecutor(
    0,  // min number of thread in pool
    nThread, // max number of thread in pool
    timeoutSec, // terminate idle thread after
    TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(),
    threadFactory
);

相关问题