本文整理了Java中java.util.concurrent.ThreadPoolExecutor
类的一些代码示例,展示了ThreadPoolExecutor
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ThreadPoolExecutor
类的具体详情如下:
包路径:java.util.concurrent.ThreadPoolExecutor
类名称:ThreadPoolExecutor
[英]An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.
Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. Each ThreadPoolExecutor also maintains some basic statistics, such as the number of completed tasks.
To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient Executors factory methods Executors#newCachedThreadPool (unbounded thread pool, with automatic thread reclamation), Executors#newFixedThreadPool(fixed size thread pool) and Executors#newSingleThreadExecutor (single background thread), that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class: Core and maximum pool sizes A ThreadPoolExecutor will automatically adjust the pool size (see #getPoolSize) according to the bounds set by corePoolSize (see #getCorePoolSize) and maximumPoolSize (see #getMaximumPoolSize). When a new task is submitted in method #execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using #setCorePoolSize and #setMaximumPoolSize. On-demand construction By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method #prestartCoreThread or #prestartAllCoreThreads. You probably want to prestart threads if you construct the pool with a non-empty queue. Creating new threads New threads are created using a ThreadFactory. If not otherwise specified, a Executors#defaultThreadFactory is used, that creates threads to all be in the same ThreadGroup and with the same NORM_PRIORITY priority and non-daemon status. By supplying a different ThreadFactory, you can alter the thread's name, thread group, priority, daemon status, etc. If a ThreadFactory fails to create a thread when asked by returning null from newThread, the executor will continue, but might not be able to execute any tasks. Keep-alive times If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see #getKeepAliveTime(TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using method #setKeepAliveTime(long,TimeUnit). Using a value of Long.MAX_VALUE TimeUnit#NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSize threads. But method #allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero. Queuing Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.
Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.
Rejected tasks New tasks submitted in method #execute(Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler#rejectedExecution(Runnable,ThreadPoolExecutor)method of its RejectedExecutionHandler. Four predefined handler policies are provided:
In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)
It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies. Hook methods This class provides protected overridable #beforeExecute(Thread,Runnable) and #afterExecute(Runnable,Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method #terminated can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.
If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.
Queue maintenance Method #getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, #remove(Runnable) and #purge are available to assist in storage reclamation when large numbers of queued tasks become cancelled. Finalization A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically. If you would like to ensure that unreferenced pools are reclaimed even if users forget to call #shutdown, then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of zero core threads and/or setting #allowCoreThreadTimeOut(boolean).
Extension example. Most extensions of this class override one or more of the protected hook methods. For example, here is a subclass that adds a simple pause/resume feature:
class PausableThreadPoolExecutor extends ThreadPoolExecutor public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r)
super.beforeExecute(t, r);
pauseLock.lock();
try
while (isPaused) unpaused.await();
} catch (InterruptedException ie)
t.interrupt();
} finally
pauseLock.unlock();
}
}
public void pause()
pauseLock.lock();
try
isPaused = true;
} finally
pauseLock.unlock();
}
}
public void resume()
pauseLock.lock();
try
isPaused = false;
unpaused.signalAll();
} finally
pauseLock.unlock();
}
}
}}
[中]一种ExecutorService,使用可能的几个池线程之一执行每个提交的任务,通常使用Executors工厂方法进行配置。
线程池解决两个不同的问题:它们通常在执行大量异步任务时提供更好的性能,这是因为减少了每个任务的调用开销;它们还提供了一种方法,用于限制和管理执行任务集合时消耗的资源,包括线程。每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。
为了在广泛的上下文中有用,这个类提供了许多可调整的参数和可扩展性挂钩。然而,程序员们被敦促使用更方便的Executors工厂方法Executors#newCachedThreadPool(无边界线程池,带有自动线程回收)、Executors#newFixedThreadPool(固定大小的线程池)和Executors#newSingleThreadExecutor(单后台线程),为最常见的使用场景预先配置设置。否则,在手动配置和优化此类时,请使用以下指南:Core和maximumPoolSize ThreadPoolExecutor将根据corePoolSize(请参见#getCorePoolSize)和maximumPoolSize(请参见#getMaximumPoolSize)设置的边界自动调整池大小(请参见#getPoolSize)。在方法#execute(Runnable)中提交新任务时,如果运行的线程少于corePoolSize,则会创建一个新线程来处理该请求,即使其他工作线程处于空闲状态。如果运行的线程超过corePoolSize,但小于maximumPoolSize,则仅当队列已满时,才会创建新线程。通过将corePoolSize和maximumPoolSize设置相同,可以创建一个固定大小的线程池。通过将maximumPoolSize设置为本质上无界的值,例如整数。最大值,允许池容纳任意数量的并发任务。最典型的情况是,核心池和最大池大小仅在构建时设置,但也可以使用#setCorePoolSize和#setMaximumPoolSize动态更改。按需构建默认情况下,即使核心线程最初也会创建,并仅在新任务到达时启动,但这可以使用方法#prestartCoreThread或#prestartAllCoreThreads动态覆盖。如果使用非空队列构建池,可能需要预启动线程。创建新线程使用ThreadFactory创建新线程。如果没有另外指定,则使用Executors#defaultThreadFactory,它创建的线程都位于同一线程组中,并且具有相同的NORM_优先级和非守护进程状态。通过提供不同的ThreadFactory,您可以更改线程的名称、线程组、优先级、守护程序状态等。如果ThreadFactory在从newThread返回null时未能创建线程,则执行器将继续,但可能无法执行任何任务。保持活动时间如果池当前有超过corePoolSize的线程,如果多余的线程空闲时间超过keepAliveTime(请参阅#getKeepAliveTime(TimeUnit))。这提供了一种在池未被积极使用时减少资源消耗的方法。如果池稍后变得更活跃,将构造新线程。也可以使用方法#setKeepAliveTime(long,TimeUnit)动态更改此参数。使用Long的值。MAX_VALUE TimeUnit#纳秒有效地禁止空闲线程在关闭之前终止。默认情况下,仅当存在多个corePoolSize线程时,“保持活动”策略才适用。但是方法#allowCoreThreadTimeOut(布尔值)也可以用于将此超时策略应用于核心线程,只要keepAliveTime值不为零。排队任何阻塞队列都可用于传输和保留提交的任务。此队列的使用与池大小交互:
*如果运行的线程少于corePoolSize,那么执行器总是倾向于添加新线程,而不是排队。
*如果corePoolSize或多个线程正在运行,执行者总是倾向于将请求排队,而不是添加新线程。
*如果请求无法排队,则会创建一个新线程,除非该线程超过maximumPoolSize,在这种情况下,任务将被拒绝。
排队的一般策略有三种:
1.*直接切换。*对于工作队列来说,一个很好的默认选择是SynchronousQueue,它将任务交给线程,而无需持有它们。在这里,如果没有线程可以立即运行任务,那么尝试将任务排队将失败,因此将构造一个新线程。在处理可能具有内部依赖关系的请求集时,此策略避免了锁定。直接切换通常需要无限的最大工具大小,以避免拒绝新提交的任务。这反过来又允许了当命令继续以平均比处理速度更快的速度到达时,线程无限增长的可能性。
1.无限队列。使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在所有corePoolSize线程都繁忙时在队列中等待。因此,创建的线程不会超过corePoolSize。(因此,maximumPoolSize的值没有任何影响。)当每个任务完全独立于其他任务时,这可能是合适的,因此任务不会影响其他任务的执行;例如,在网页服务器中。虽然这种排队方式有助于消除短暂的请求突发,但当命令继续以平均比处理速度更快的速度到达时,工作队列可能会无限增长。
1.有界队列。当与有限的MaximumPoolSize一起使用时,有界队列(例如ArrayBlockingQueue)有助于防止资源耗尽,但可能更难调整和控制。队列大小和最大池大小可以相互权衡:使用大型队列和小型池可以最大限度地减少CPU使用、操作系统资源和上下文切换开销,但可能会导致人为的低吞吐量。如果任务经常阻塞(例如,如果它们是I/O绑定的),系统可能能够为更多的线程安排时间,而不是您允许的时间。使用小队列通常需要更大的池大小,这使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
被拒绝的任务当执行器关闭时,以及当执行器对最大线程和工作队列容量使用有限界限且饱和时,在方法#execute(Runnable)中提交的新任务将被拒绝。在这两种情况下,execute方法都会调用其RejectedExecutionHandler的rejectedExecution#rejectedExecution(Runnable,ThreadPoolExecutor)方法。提供了四个预定义的处理程序策略:
1.在默认ThreadPoolExecutor中。AbortPolicy,处理程序在拒绝时抛出runtime RejectedExecutionException。
1.在线程池执行器中。CallerRunPolicy,调用自身执行的线程运行任务。这提供了一种简单的反馈控制机制,可以降低提交新任务的速度。
1.在线程池执行器中。丢弃策略,无法执行的任务将被简单地丢弃。
1.在线程池执行器中。DiscardolTestPolicy,如果执行器未关闭,则会丢弃工作队列头部的任务,然后重试执行(这可能再次失败,导致重复执行)
可以定义和使用其他类型的RejectedExecutionHandler类。这样做需要一定的谨慎,尤其是当策略设计为仅在特定容量或排队策略下工作时。Hook方法这个类提供了受保护的可重写#beforeExecute(Thread,Runnable)和#afterExecute(Runnable,Throwable)方法,它们在每个任务执行之前和之后都会被调用。这些可用于操纵执行环境;例如,重新初始化ThreadLocals、收集统计信息或添加日志条目。此外,可以重写方法#terminated,以执行执行器完全终止后需要执行的任何特殊处理。
如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。
队列维护方法#getQueue()允许访问工作队列以进行监视和调试。强烈反对将此方法用于任何其他目的。提供的两种方法#remove(Runnable)和#purge可用于在大量排队的任务被取消时帮助进行存储回收。终结程序中不再引用且没有剩余线程的池将自动关闭。如果您希望确保即使用户忘记调用#shutdown,也能回收未引用的池,那么您必须通过设置适当的保持活动时间、使用零核心线程的下限和/或设置#allowCoreThreadTimeOut(布尔值)来安排未使用的线程最终死亡。
扩展示例。此类的大多数扩展都会覆盖一个或多个受保护的钩子方法。例如,下面是一个子类,它添加了一个简单的暂停/恢复功能:
class PausableThreadPoolExecutor extends ThreadPoolExecutor public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r)
super.beforeExecute(t, r);
pauseLock.lock();
try
while (isPaused) unpaused.await();
} catch (InterruptedException ie)
t.interrupt();
} finally
pauseLock.unlock();
}
}
public void pause()
pauseLock.lock();
try
isPaused = true;
} finally
pauseLock.unlock();
}
}
public void resume()
pauseLock.lock();
try
isPaused = false;
unpaused.signalAll();
} finally
pauseLock.unlock();
}
}
}}
代码示例来源:origin: wildfly/wildfly
protected ThreadPoolExecutor createThreadPool() {
ThreadPoolExecutor threadPool=new ThreadPoolExecutor(0, max_pool, pool_thread_keep_alive,
TimeUnit.MILLISECONDS, new SynchronousQueue<>());
ThreadFactory factory=new ThreadFactory() {
private final AtomicInteger thread_id=new AtomicInteger(1);
public Thread newThread(final Runnable command) {
return getThreadFactory().newThread(command, "StreamingStateTransfer-sender-" + thread_id.getAndIncrement());
}
};
threadPool.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(threadPool.getRejectedExecutionHandler()));
threadPool.setThreadFactory(factory);
return threadPool;
}
代码示例来源:origin: apache/incubator-dubbo
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
代码示例来源:origin: thinkaurelius/titan
@Override
public void close() throws Exception {
processor.shutdown();
processor.awaitTermination(shutdownWaitMS,TimeUnit.MILLISECONDS);
if (!processor.isTerminated()) {
//log.error("Processor did not terminate in time");
processor.shutdownNow();
}
}
}
代码示例来源:origin: apache/incubator-dubbo
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
代码示例来源:origin: skylot/jadx
@Override
public Boolean call() throws Exception {
runJob();
executor.shutdown();
return executor.awaitTermination(5, TimeUnit.DAYS);
}
});
代码示例来源:origin: google/sagetv
Set<String> noProgramDetails = new HashSet<>();
Set<String> needSeriesDetails = new HashSet<>();
Set<SDPerson> addedPeople = new HashSet<>();
final AtomicInteger totalPeople = new AtomicInteger(0);
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(9);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(runnable);
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
remainingImports = queue.size();
peopleSize = totalPeople.get();
if (Sage.DBG) System.out.println("SDEPG Imported images for " +
peopleSize + " pe" + (peopleSize == 1 ? "rson" : "ople") +
代码示例来源:origin: fbacchella/jrds
public void collectAll() {
return;
final AtomicInteger counter = new AtomicInteger(0);
try {
if(isCollectRunning()) {
List<Future<Object>> scheduled = tpool.invokeAll(toSchedule, getStep() - getTimeout() * 2, TimeUnit.SECONDS);
running.addAll(scheduled);
tpool.shutdown();
tpool.awaitTermination(getStep() - getTimeout() * 2, TimeUnit.SECONDS);
if(!tpool.isTerminated()) {
emergencystop = !tpool.awaitTermination(getTimeout(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
log(Level.INFO, "Collect interrupted in last chance");
log(Level.INFO, "Some task still alive, needs to be killed");
tpool.shutdownNow();
dumpCollectHanged();
} finally {
synchronized (running) {
tpool.shutdown();
tpool = null;
代码示例来源:origin: apache/activemq
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
if (inReceive.get() || currentCounter != previousCounter) {
LOG.trace("A receive is in progress, skipping read check.");
return;
if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) {
LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this);
ASYNC_TASKS.execute(new Runnable() {
@Override
public void run() {
});
} catch (RejectedExecutionException ex) {
if (!ASYNC_TASKS.isShutdown()) {
LOG.error("Async read check was rejected from the executor: ", ex);
throw ex;
代码示例来源:origin: hibernate/hibernate-search
@Test
public void testPropertiesIndexing() throws InterruptedException {
SearchIntegrator integrator = sfHolder.getSearchFactory();
ThreadPoolExecutor threadPool = Executors.newFixedThreadPool( THREAD_NUMBER, "ReadWriteParallelismTest" );
for ( int i = 0; i < THREAD_NUMBER; i++ ) {
threadPool.execute( new Task( integrator, i ) );
}
threadPool.shutdown();
//Time to warmup only:
threadPool.awaitTermination( WARM_UP_SECONDS, TimeUnit.SECONDS );
System.out.println( "Warmup complete. Start measuring now.." );
//Start measuring:
cyclesCompleted.set( 0 );
long startMeasurementTime = System.nanoTime();
threadPool.awaitTermination( FULL_RUN_SECONDS, TimeUnit.SECONDS );
int doneCycles = cyclesCompleted.get();
long endMeasurementTime = System.nanoTime();
Assert.assertFalse( "Some failure happened in Task execution", failures.get() );
long totalTime = endMeasurementTime - startMeasurementTime;
long millisecondsElapsedTime = TimeUnit.MILLISECONDS.convert( totalTime, TimeUnit.NANOSECONDS );
System.out.println( "Completed " + doneCycles + " in " + millisecondsElapsedTime + " milliseconds" );
running.set( false );
}
代码示例来源:origin: apache/cxf
ex.execute(r);
if (addWorkerMethod != null
&& !ex.getQueue().isEmpty()
&& this.approxThreadCount < highWaterMark
&& addThreadLock.tryLock()) {
try {
mainLock.lock();
try {
int ps = this.getPoolSize();
int sz = executor.getQueue().size();
int sz2 = this.getActiveCount();
mainLock.unlock();
代码示例来源:origin: pwm-project/pwm
void writeUserOrgChartDetailToCsv(
final CSVPrinter csvPrinter,
final UserIdentity userIdentity,
final int depth
)
{
final Instant startTime = Instant.now();
LOGGER.trace( pwmRequest, () -> "beginning csv export starting with user " + userIdentity.toDisplayString() + " and depth of " + depth );
final ThreadPoolExecutor executor = pwmRequest.getPwmApplication().getPeopleSearchService().getJobExecutor();
final AtomicInteger rowCounter = new AtomicInteger( 0 );
final OrgChartExportState orgChartExportState = new OrgChartExportState(
executor,
csvPrinter,
rowCounter,
Collections.singleton( OrgChartExportState.IncludeData.displayForm )
);
final OrgChartCsvRowOutputJob job = new OrgChartCsvRowOutputJob( orgChartExportState, userIdentity, depth, null );
executor.execute( job );
final TimeDuration maxDuration = peopleSearchConfiguration.getExportCsvMaxDuration();
JavaHelper.pause( maxDuration.asMillis(), 1000, o -> ( executor.getQueue().size() + executor.getActiveCount() <= 0 ) );
final TimeDuration timeDuration = TimeDuration.fromCurrent( startTime );
LOGGER.trace( pwmRequest, () -> "completed csv export of " + rowCounter.get() + " records in " + timeDuration.asCompactString() );
}
代码示例来源:origin: NetApp/NetApp-Hadoop-NFS-Connector
this.readBlockSizeBits = store.getReadSizeBits();
this.splitSize = splitSize;
this.closed = new AtomicBoolean(false);
this.ongoing = new ConcurrentHashMap<>(DEFAULT_PREFETCH_POOL_SIZE);
this.cache = new ConcurrentHashMap<>(DEFAULT_CACHE_SIZE_IN_BLOCKS);
this.statistics
= new StreamStatistics(NFSBufferedInputStream.class + pathString, streamId.getAndIncrement(),
true);
this.executors = new ThreadPoolExecutor(DEFAULT_PREFETCH_POOL_SIZE, MAX_PREFETCH_POOL_SIZE, 5, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
代码示例来源:origin: alipay/sofa-rpc
@Override
public void destroy() {
if (!started) {
return;
}
int stopTimeout = serverConfig.getStopTimeout();
if (stopTimeout > 0) { // 需要等待结束时间
AtomicInteger count = boltServerProcessor.processingCount;
// 有正在执行的请求 或者 队列里有请求
if (count.get() > 0 || bizThreadPool.getQueue().size() > 0) {
long start = RpcRuntimeContext.now();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("There are {} call in processing and {} call in queue, wait {} ms to end",
count, bizThreadPool.getQueue().size(), stopTimeout);
}
while ((count.get() > 0 || bizThreadPool.getQueue().size() > 0)
&& RpcRuntimeContext.now() - start < stopTimeout) { // 等待返回结果
try {
Thread.sleep(10);
} catch (InterruptedException ignore) {
}
}
} // 关闭前检查已有请求?
}
// 关闭线程池
bizThreadPool.shutdown();
stop();
}
代码示例来源:origin: net.therore/therore-concurrent
private void addThread(Runnable firstTask) {
FlowControlWrapper w = new FlowControlWrapper(firstTask);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
activeCount.incrementAndGet();
} finally {
mainLock.unlock();
}
coreExecutorService.execute(w);
}
代码示例来源:origin: com.torodb.torod/db-executor
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
lock.lock();
try {
while (queue.size() >= queueSize) {
insertionsAllowed.await();
}
executor.execute(r);
}
finally {
lock.unlock();
}
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
代码示例来源:origin: qunarcorp/qmq
public ActorSystem(String name, int threads, boolean fair) {
this.name = name;
this.actorsCount = new AtomicInteger();
BlockingQueue<Runnable> queue = fair ? new PriorityBlockingQueue<>() : new LinkedBlockingQueue<>();
this.executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.MINUTES, queue, new NamedThreadFactory("actor-sys-" + name));
this.actors = Maps.newConcurrentMap();
QMon.dispatchersGauge(name, actorsCount::doubleValue);
QMon.actorSystemQueueGauge(name, () -> (double) executor.getQueue().size());
}
代码示例来源:origin: wycm/zhihu-crawler
Connection cn = getConnection();
if (zhiHuDao1.insertUser(cn, u)){
parseUserCount.incrementAndGet();
if (zhiHuHttpClient.getDetailListPageThreadPool().getQueue().size() > 1000){
continue;
zhiHuHttpClient.getDetailListPageThreadPool().getActiveCount() == 1){
zhiHuHttpClient.getDetailListPageThreadPool().execute(new DetailListPageTask(request, true));
else if(!Config.dbEnable || zhiHuHttpClient.getDetailListPageThreadPool().getActiveCount() == 1){
parseUserCount.incrementAndGet();
for (int j = 0; j < u.getFollowees() / 20; j++){
String nextUrl = String.format(USER_FOLLOWEES_URL, u.getUserToken(), j * 20);
HttpGet request = new HttpGet(nextUrl);
zhiHuHttpClient.getDetailListPageThreadPool().execute(new DetailListPageTask(request, true));
代码示例来源:origin: Netflix/eureka
/* visible for testing */ boolean doWarmUp() {
Future future = null;
try {
future = threadPoolExecutor.submit(updateTask);
future.get(warmUpTimeoutMs, TimeUnit.MILLISECONDS); // block until done or timeout
return true;
} catch (Exception e) {
logger.warn("Best effort warm up failed", e);
} finally {
if (future != null) {
future.cancel(true);
}
}
return false;
}
代码示例来源:origin: weibocom/motan
private void rejectMessage(ChannelHandlerContext ctx, NettyMessage msg) {
if (msg.isRequest()) {
DefaultResponse response = new DefaultResponse();
response.setRequestId(msg.getRequestId());
response.setException(new MotanServiceException("process thread pool is full, reject by server: " + ctx.channel().localAddress(), MotanErrorMsgConstant.SERVICE_REJECT));
sendResponse(ctx, response);
LoggerUtil.error("process thread pool is full, reject, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}",
threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCorePoolSize(),
threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getTaskCount(), msg.getRequestId());
rejectCounter.incrementAndGet();
}
}
代码示例来源:origin: apache/rocketmq
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
内容来源于网络,如有侵权,请联系作者删除!