java中线程池核心API源码分析

x33g5p2x  于2022-02-07 转载在 Java  
字(5.7k)|赞(0)|评价(0)|浏览(595)

概述

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

jdk中当然也提供了一些操作线程池的API。

java中操作线程池的api位于java.util.concurrent;包中。

常见的核心接口和实现类包括Executor接口、ExecutorService接口、ScheduledExecutorService接口、ThreadPoolExecutor实现类、ScheduledThreadPoolExecutor实现类等。

Executor接口
最上层的接口,定义了执行任务的execute()方法。

ExecutorService接口
继承了Executor接口,扩展了Callable、Future、关闭方法。

ScheduledExecutorService接口
继承了ExecutorService接口,增加了定时任务相关的方法。


ThreadPoolExecutor实现类
基础、标准的线程池实现。

ScheduledThreadPoolExecutor实现类
继承了ThreadPoolExecutor,实现了ScheduledExecutorService中定时任务相关的方法。

他们之间的继承实现关系图如下:

源码分析

下面对Executor接口、ExecutorService接口、ScheduledExecutorService接口、ThreadPoolExecutor实现类、ScheduledThreadPoolExecutor实现类这几个核心api的源码进行分析。

Executor

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor是最上层的接口,该接口提供了一个execute()方法,用于执行指定的Runnable任务。

ExecutorService

ExecutorService接口继承至Executor接口,扩展了Callable、Future、关闭等方法。

public interface ExecutorService extends Executor

定义的方法

//启动有序关闭,先前已经执行的的任务将会继续执行,但不会执行新的任务。
    void shutdown();

     //尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。
    List<Runnable> shutdownNow();

     //如果这个Executor已被关闭,则返回 true 。 
    boolean isShutdown();

    //如果所有任务在关闭后完成,则返回true 。 请注意, isTerminated从不true ,除非shutdown或shutdownNow先被执行。 
    boolean isTerminated();

    //阻止所有任务在关闭请求完成后执行,或发生超时,或当前线程中断,以先到者为准。 
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    //提交值返回任务以执行,并返回代表任务待处理结果的Future。
    <T> Future<T> submit(Callable<T> task);

    //提交一个可运行的任务执行,并返回一个表示该任务的Future。Future的get方法将在成功完成后返回给定的结果。
    <T> Future<T> submit(Runnable task, T result);

    提交一个可运行的任务执行,并返回一个表示该任务的Future。Future的get方法将在成功完成后返回null。
    Future<?> submit(Runnable task);

    //执行给定的任务,返回持有他们的状态和结果的所有完成的Future列表。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    //执行给定的任务,返回在所有完成或超时到期时持有其状态和结果的Future列表
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    //执行给定的任务,返回一个成功完成的结果(即没有抛出异常),如果有的话。 正常或异常退出后,尚未完成的任务将被取消。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    //执行给定的任务,返回一个已经成功完成的结果(即,不抛出异常),如果有的话。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

ScheduledExecutorService

ScheduledExecutorService接口继承了ExecutorService接口,增加了定时任务相关的方法。

public interface ScheduledExecutorService extends ExecutorService

方法

/* 创建并执行在给定延迟后启用的单次操作。

	参数
	    command - 要执行的任务 
	    delay - 从现在开始延迟执行的时间 
	    unit - 延时参数的时间单位 
	结果
	    表示任务等待完成,并且它的ScheduledFuture get()方法将返回 null。
    */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    /* 创建并执行在给定延迟后启用的ScheduledFuture。
		参数类型
		    V - 可调用结果的类型 
		参数
		    callable - 执行的功能 
		    delay - 从现在开始延迟执行的时间 
		    unit - 延迟参数的时间单位 
		结果
		    一个可用于提取结果或取消的ScheduledFuture 
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /*在初始延迟之后周期性的执行command。
     参数
	    command - 要执行的任务 
	    initialDelay - 延迟第一次执行的时间 
	    period - 连续执行之间的时期 
	    unit - initialDelay和period参数的时间单位 
	结果
	    一个ScheduledFuture代表待完成的任务,其 get()方法将在取消时抛出异常 
    */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
      创建并执行在给定的初始延迟之后首先启用的定期动作,随后在一个执行的终止和下一个执行的开始之间给定的延迟。 如果任务的执行遇到异常,则后续的执行被抑制。 否则,任务将仅通过取消或终止执行人终止。
		参数
		    command - 要执行的任务 
		    initialDelay - 延迟第一次执行的时间 
		    delay - 一个执行终止与下一个执行的开始之间的延迟 
		    unit - initialDelay和delay参数的时间单位 
		结果
		    一个ScheduledFuture代表待完成的任务,其 get()方法将在取消时抛出异常 
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

ThreadPoolExecutor

ThreadPoolExecutor是一个基础、标准的线程池实现。

ThreadPoolExecutor中提供了四种构造函数以创建线程池。

参数的定义如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
/

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

/
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

/
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ScheduledThreadPoolExecutor

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService

继承了ThreadPoolExecutor,实现了ScheduledExecutorService中定时任务相关的方法。

相关文章