java executorservice,可动态扩展线程数

vhipe2zx  于 2021-07-03  发布在  Java
关注(0)|答案(2)|浏览(455)

我有一个工作单元列表,我想并行处理它们。每个单元工作8-15秒,完全计算时间,无i/o阻塞。我想要的是拥有一个 ExecutorService 那就是:
在没有工作要做时示例化零个线程
如果需要,可以动态扩展到20个线程
允许我一次添加所有工作单元(不阻止提交)
比如:

Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
    service.submit(() -> {
        .. do some work ..
        queue.offer(result);
    );
}
while(queue.peek() != null) {
    ... process results while they arrive ...
}

我尝试过但没有成功的是:
使用 newCachedThreadPool() 创建的线程太多
然后我用了它的内部调用 new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>()) ,但我注意到submit()由于同步队列而阻塞
所以我用了 new LinkedBlockingQueue() ,只是为了发现threadpoolexecutor只生成一个线程
我确信有官方的实现来处理这个非常基本的并发用例。有人能给点建议吗?

frebpwbc

frebpwbc1#

创建 ThreadPoolExecutor 使用 LinkedBlockingQueue 以及 20 作为 corePoolSize (构造函数中的第一个参数): new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>()); 如果你使用 LinkedBlockingQueue 如果没有预定义的容量 Pool :
不会检查的 maxPoolSize .
不会创建比 corePoolSize 的指定号码。
在您的情况下,将只执行一个线程。你很幸运得到一个,因为你把它设置为 0 如果 corePoolSize 已设置为 0 (他们怎么敢?)。
进一步的版本确实会创建一个新线程,即使 corePoolSize0 ,看起来。。。一个修复是。。。一只虫子。。。变化。。。合乎逻辑的行为?。
线程池执行器
使用无界队列(例如 LinkedBlockingQueue 如果没有预定义的容量,当所有corepoolsize线程都忙时,将导致新任务在队列中等待。因此,创建的线程不会超过corepoolsize(因此,maximumpoolsize的值没有任何影响。)
关于缩小
为了在没有工作要做的情况下移除所有线程,您必须关闭 coreThreads 特别是(它们不会默认终止)。为此,请设置 allowCoreThreadTimeOut(true) 启动前 Pool .
注意设置正确的 keep-alive 超时:例如,如果平均每6秒接收一个新任务,那么将保持活动时间设置为5秒可能会导致不必要的擦除+创建操作(哦,亲爱的线程,您只需等待一秒钟!)。根据任务接收速度设置此超时。 allowCoreThreadTimeOut 设置控制核心线程是否可以超时并在保持活动状态时间内没有任务到达时终止的策略,如果需要,在新任务到达时替换。如果为false,则不会由于缺少传入任务而终止核心线程。如果为true,则应用于非核心线程的保持活动状态策略也适用于核心线程。为避免连续替换线程,设置true时保持活动时间必须大于零。通常,应该在主动使用池之前调用此方法。
tl/dr公司
无界的 LinkedBloquingQueue 作为任务队列。 corePoolSize 更换 maxPoolSize 的意思。 allowCoreThreadTimeOut(true) 为了让 Pool 使用基于超时的机制缩小 coreThreads . keep-alive 值设置为基于任务接收延迟的逻辑值。
这种新的混合会导致 ExecutorService 99999999%的时间不会阻塞提交者(要做到这一点,排队的任务数应该是2.147.483.647),并且可以根据工作负载有效地扩展线程数,在两个方向上波动 { 0 <--> corePoolSize } 并发线程。
作为一个建议,应该监视队列的大小,因为非阻塞行为有一个代价:获得的概率 OOM 除非它在没有控制的情况下继续增长,直到 INTEGER.MAX_VALUE 满足(例如:当提交者不断插入任务时,线程一整天都处于死锁状态)。即使任务在内存中的大小可能很小,2.147.483.647对象及其相应的链接 Package 器等。。。有很多额外的负担。

js5cn81o

js5cn81o2#

最简单的方法就是使用

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

类的执行者。这将为您提供一个简单的现成解决方案。你得到的游泳池将根据需要扩大和缩小。你可以用处理核心线程超时等的方法来进一步配置它。
scheduledexecutorservice是executorservice类的扩展,是唯一一个可以动态扩展和收缩的现成服务。

相关问题