我有一个工作单元列表,我想并行处理它们。每个单元工作8-15秒,完全计算时间,无i/o阻塞。我想要的是拥有一个 ExecutorService
那就是:
在没有工作要做时示例化零个线程
如果需要,可以动态扩展到20个线程
允许我一次添加所有工作单元(不阻止提交)
比如:
Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
service.submit(() -> {
.. do some work ..
queue.offer(result);
);
}
while(queue.peek() != null) {
... process results while they arrive ...
}
我尝试过但没有成功的是:
使用 newCachedThreadPool()
创建的线程太多
然后我用了它的内部调用 new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>())
,但我注意到submit()由于同步队列而阻塞
所以我用了 new LinkedBlockingQueue()
,只是为了发现threadpoolexecutor只生成一个线程
我确信有官方的实现来处理这个非常基本的并发用例。有人能给点建议吗?
2条答案
按热度按时间frebpwbc1#
创建
ThreadPoolExecutor
使用LinkedBlockingQueue
以及20
作为corePoolSize
(构造函数中的第一个参数):new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());
如果你使用LinkedBlockingQueue
如果没有预定义的容量Pool
:不会检查的
maxPoolSize
.不会创建比
corePoolSize
的指定号码。在您的情况下,将只执行一个线程。你很幸运得到一个,因为你把它设置为
0
如果corePoolSize
已设置为0
(他们怎么敢?)。进一步的版本确实会创建一个新线程,即使
corePoolSize
是0
,看起来。。。一个修复是。。。一只虫子。。。变化。。。合乎逻辑的行为?。线程池执行器
使用无界队列(例如
LinkedBlockingQueue
如果没有预定义的容量,当所有corepoolsize线程都忙时,将导致新任务在队列中等待。因此,创建的线程不会超过corepoolsize(因此,maximumpoolsize的值没有任何影响。)关于缩小
为了在没有工作要做的情况下移除所有线程,您必须关闭
coreThreads
特别是(它们不会默认终止)。为此,请设置allowCoreThreadTimeOut(true)
启动前Pool
.注意设置正确的
keep-alive
超时:例如,如果平均每6秒接收一个新任务,那么将保持活动时间设置为5秒可能会导致不必要的擦除+创建操作(哦,亲爱的线程,您只需等待一秒钟!)。根据任务接收速度设置此超时。allowCoreThreadTimeOut
设置控制核心线程是否可以超时并在保持活动状态时间内没有任务到达时终止的策略,如果需要,在新任务到达时替换。如果为false,则不会由于缺少传入任务而终止核心线程。如果为true,则应用于非核心线程的保持活动状态策略也适用于核心线程。为避免连续替换线程,设置true时保持活动时间必须大于零。通常,应该在主动使用池之前调用此方法。tl/dr公司
无界的
LinkedBloquingQueue
作为任务队列。corePoolSize
更换maxPoolSize
的意思。allowCoreThreadTimeOut(true)
为了让Pool
使用基于超时的机制缩小coreThreads
.keep-alive
值设置为基于任务接收延迟的逻辑值。这种新的混合会导致
ExecutorService
99999999%的时间不会阻塞提交者(要做到这一点,排队的任务数应该是2.147.483.647),并且可以根据工作负载有效地扩展线程数,在两个方向上波动{ 0 <--> corePoolSize }
并发线程。作为一个建议,应该监视队列的大小,因为非阻塞行为有一个代价:获得的概率
OOM
除非它在没有控制的情况下继续增长,直到INTEGER.MAX_VALUE
满足(例如:当提交者不断插入任务时,线程一整天都处于死锁状态)。即使任务在内存中的大小可能很小,2.147.483.647对象及其相应的链接 Package 器等。。。有很多额外的负担。js5cn81o2#
最简单的方法就是使用
类的执行者。这将为您提供一个简单的现成解决方案。你得到的游泳池将根据需要扩大和缩小。你可以用处理核心线程超时等的方法来进一步配置它。
scheduledexecutorservice是executorservice类的扩展,是唯一一个可以动态扩展和收缩的现成服务。