Java并发编程之 Excutor

x33g5p2x  于2021-10-04 转载在 Java  
字(10.1k)|赞(0)|评价(0)|浏览(474)

在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。

Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括RunnableCallable,而执行机制由Executor框架提供

在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上:

Executor框架的结构

任务: 包括被执行任务需要实现的接口:Runnable接口或Callable接口。

任务的执行: 包括任务执行机制的核心接口Executor,以及继承自ExecutorExecutorService接口。ExecutorService接口有两个关键的实现类:ThreadPoolExecutorScheduledThreadPoolExecutor

异步计算的结果: 包括接口Future和实现Future接口的FutureTask类。

  • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutorTimer更灵活,功能更强大。
  • Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
  • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutorScheduledThreadPoolExecutor执行。

Executor框架的执行流程

  1. 主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)Executors.callable(Runnable task, Object resule))。
  2. 然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command)),或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)ExecutorService.submit(Callable<T>task))。
  3. 如果执行的是ExecutorServicesubmit()方法,ExecutorService将返回一个实现Future接口的对象(FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。
  4. 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

Executor框架的成员

线程池ThreadPoolExecutor

三大方法七大参数四种拒绝策略

Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类。
三大方法
ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutorSingleThreadExecutorFixedThreadPoolCachedThreadPool

  • Executors.newSingleThreadExecutor()

  • Executors.newFixedThreadPool()

  • Executors.newCachedThreadPool()
    七大参数

  • int corePoolSize核心线程数。(要保留在池中的线​​程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut

  • int maximumPoolSize最大线程数。(如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务;如果使用了无界的任务队列这个参数是无效的。)

  • 对于CPU密集型任务,可以将该参数设置为 CPU核数 + 1

  • 对于IO密集型任务,可以将该参数设置为 CPU核数 /* 2

  • long keepAliveTime超时释放时间。(当线程数大于核心数时,这是多余空闲线程在终止前等待新任务的最长时间)

  • TimeUnit unit超时释放时间的单位。(枚举类TimeUnit的常量)

  • BlockingQueue<Runnable> workQueue阻塞队列。(用于在执行任务之前保存任务的队列, 这个队列将只保存execute方法提交的Runnable任务)

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。

  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,是一个容量为Integer.MAX_VALUE的队列(无界队列)。此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。

  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

  • ThreadFactory threadFactory线程工厂。(执行程序创建新线程时使用的工厂)

  • RejectedExecutionHandler handler拒绝策略。(当提交任务数超过maxmumPoolSize + workQueue 之和时,任务会交给handler来处理)
    四种拒绝策略

  • AbortPolicy:抛出RejectedExecutionException异常

  • DiscardPolicy:丢掉任务,不抛异常

  • DiscardOldestPolicy:不抛异常,尝试去和最早的去竞争,竞争失败再丢掉任务

  • CallerRunsPolicy:哪来的回哪里(将被拒绝的任务任务返回给execute()方法的调用线程中运行)

线程池的处理流程

execute()submit()的区别
public interface Executor {
    void execute(Runnable command);
}

public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
}
  1. execute()方法是Executor接口的唯一方法,submit()方法是ExecutorService的,一共有三个重载。
  2. execute()方法只能接收Runnable接口的实现类作为参数,submit()方法可以接收RunnableCallable接口的实现类作为参数。
  3. execute()方法没有返回值,submit()方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
  4. execute()方法无法处理异常,只会抛出,submit()方法可以通过返回结果的Future对象的get()方法对异常进行处理。
shutdown()shitdownNow()的区别

可以通过调用线程池的shutdown()shutdownNow()方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt()方法来中断线程,所以无法响应中断的任务可能永远无法终止。

shutdown()只是将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,没有被执行的则中断。而shutdownNow()则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回。(shutdown()不着急,shutdownNow()很着急

线程池的监控
// 返回正在积极执行任务的线程的大致数量
executor.getActiveCount();
// 返回已安排执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值
executor.getTaskCount();
// 返回已完成执行的大致任务总数
// 由于任务和线程的状态在计算过程中可能会动态变化,因此返回值只是一个近似值,但在连续调用中永远不会减少
executor.getCompletedTaskCount();
// 返回曾经同时进入池中的最大线程数
// 通过这个数据可以知道线程池是否曾经满过:如该数值等于线程池的最大大小,则表示线程池曾经满过。
executor.getLargestPoolSize();
// 返回当前池中的线程数
executor.getPoolSize();

可以通过继承线程池来自定义线程池,重写线程池的beforeExecute()afterExecute()terminated()方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool的核心线程数corePoolSize和最大线程数maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。

当线程池中的线程数大于核心线程数corePoolSizekeepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止

  • 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务
  • 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  • 线程执行完任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行
SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor(corePoolSizemaximumPoolSize被设置为1)。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

  • 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务
  • 如果当前线程池中有一个运行的线程,则将任务加入LinkedBlockingQueue
  • 核心线程执行完任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行
CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

CachedThreadPoolcorePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即 maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行:

为什么不要用Executors来创建线程

  • FixedThreadPoolSignalThreadExecutor
    FixedThreadPoolSignalThreadExecutor的阻塞队列都为LinkedBlockingQueueLinkedBlockingQueue的容量为Integer.MAX_VALUE无界队列),由于无界队列的存在,会导致maximumPoolSizekeepAliveTime是无效的,也就是说新的任务会一直添加到无界队列中去,虽然这个“无界队列”其实是有界的,但是还没等这个队列被填满,就OOM了。同时,由于无界队列不可能被正常填满,因此拒绝策略handler也是形同虚设。
  • CachedThreadPool
    CachedThreadPoolmaximumPoolSize被设置为Integer.MAX_VALUE,即 maximumPool是无界的;同时CachedThreadPool的阻塞队列为没有容量的SynchronousQueue。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源(OOM)。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。

它有三种调度任务的方式:

schedule():延迟多长时间之后只执行一次;

ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
System.out.println(LocalDateTime.now());
scheduled.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println(LocalDateTime.now());
    }
}, 4, TimeUnit.SECONDS);

scheduledAtFixedRate():延迟指定时间后执行一次,之后按照固定的时长周期执行;

ScheduledThreadPoolExecutor  scheduled = new ScheduledThreadPoolExecutor(10);
scheduled.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        System.out.println(LocalDateTime.now());
    }
}, 0, 4, TimeUnit.SECONDS);

scheduleWithFixedDelay():延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;

ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
scheduled.scheduleWithFixedDelay(new Runnable() {
    @SneakyThrows
    @Override
    public void run() {
        Thread.sleep(1000);
        System.out.println(LocalDateTime.now());
    }
}, 0,4, TimeUnit.SECONDS);

ScheduledThreadPoolExecutor有四种构造器,用来指定核心线程数、线程工厂、拒绝策略:

因为ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以它的构造器都是通过super调用的父类的构造器:

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);

DelayedWorkQueue是一个无界队列,因此这里把maximumPoolSize也设为了算是无穷大吧(没什么意义)…

ScheduledThreadPoolExecutor会把待调度的任务封装为一个ScheduledFutureTaskScheduledThreadPoolExecutor的私有内部类)对象放到优先队列DelayedWorkQueueScheduledThreadPoolExecutor的静态内部类)中。

ScheduledFutureTask主要包含3个成员变量:

  • time:表示这个任务将要被执行的具体时间
  • sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
  • period:表示任务执行的间隔周期

DelayedWorkQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTasktime相同,就比较sequenceNumbersequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

执行流程

  1. 调用DelayedWorkQueuetake()方法获取一个已到期(ScheduledFutureTasktime大于等于当前时间)的ScheduledFutureTask对象
  2. 调用ScheduledFutureTask对象的run()方法执行ScheduledFutureTask对象
  3. 调用ScheduledFutureTask对象的setNextRunTime()方法修改ScheduledFutureTasktime变量为下次将要被执行的时间
  4. 调用ScheduledFutureTask对象的reExecutePeriodic()方法将修改time之后的ScheduledFutureTask放回DelayedWorkQueue

Future

Future接口主要提供了异步返回任务执行结果,取消任务执行,获取任务执行状态的功能,接口定义如下:

public interface Future<V> {
    // 取消任务执行
    // mayInterruptIfRunning用于控制如果任务正在执行,是否中断对应的执行线程来取消该任务
    // 成功cancel,则isCancelled和isDoned都返回true。
    boolean cancel(boolean mayInterruptIfRunning);

    // 任务是否已取消
    boolean isCancelled();

    // 正常执行,被取消,异常退出都返回true
    boolean isDone();

    // 阻塞等待执行结果
    // CancellationException:任务被取消
    // ExecutionException:任务执行异常
    // InterruptedException:该等待结果线程被中断
    V get() throws InterruptedException, ExecutionException;

    // 阻塞等待执行结果指定时间,除了以上异常,
    // TimeoutException:等待超时
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit()方法返回一个FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel()方法。除此以外,还可以单独使用FutureTask

FutureTask的三种状态

根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态:

  • 未启动:当创建一个FutureTask且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
  • 已启动FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
  • 已完成FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel()),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态

FutureTask处于未启动已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常

FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel()方法将返回false

相关文章