你的位置:首页 > Java教程

[Java教程]第十四章 Executors源码解析


前边两章介绍了基础线程池ThreadPoolExecutor的使用方式、工作机理、参数详细介绍以及核心源码解析。

具体的介绍请参照:

第十二章 ThreadPoolExecutor使用与工作机理

第十三章 ThreadPoolExecutor源码解析

1、Executors与ThreadPoolExecutor

  • ThreadPoolExecutor
    • 可以灵活的自定义的创建线程池,可定制性很高
    • 想创建好一个合适的线程池比较难
    • 使用稍微麻烦一些
    • 实际中很少使用
  • Executors
    • 可以创建4种线程池,这四种线程池基本上已经包含了所有需求,将来根据业务特点选用就好
    • 使用非常简单
    • 实际中很常用

 使用方法:

package com.collection.test;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class ThreadPoolExecutorTest {  //private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));  //private static Executor executor = Executors.newFixedThreadPool(5);  //private static Executor executor = Executors.newSingleThreadExecutor();  //private static Executor executor = Executors.newCachedThreadPool();  private static Executor executor = Executors.newScheduledThreadPool(5);    public void executeTask(){    Task1 task1 = new Task1();//构建任务1    Task2 task2 = new Task2();//构建任务2    executor.execute(task1);//执行任务1    executor.execute(task2);//执行任务2  }    /*   * 基本任务2   */  class Task1 implements Runnable{    public void run() {      //具体任务的业务      for(int i=0;i<1000;i++){        System.out.println("hello xxx!!!");      }    }  }    /*   * 基本任务2   */  class Task2 implements Runnable{    public void run() {      //具体任务的业务      for(int i=0;i<5;i++){        System.out.println("hello world2!!!");      }    }  }    public static void main(String[] args) {    ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();    test.executeTask();  }}

View Code

 

2、Executors可以创建的几种线程池简介

  • newFixedThreadPool(int corePoolSize)
    • 创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池
    • 核心线程会一直运行
    • 无界队列LinkedBlockingQueue
  • newSingleThreadExecutor
    • 创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池
    • 核心线程会一直运行
    • 无界队列LinkedBlockingQueue
    • 所有task都是串行执行的(即同一时刻只有一个任务在执行)
  • newCachedThreadPool
    • corePoolSize==0
    • maximumPoolSize==Integer.MAX_VALUE
    • 队列:SynchronousQueue
    • 创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务
    • 这种池将会在执行许多耗时短的异步任务的时候提高程序的性能
    • 6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源
  • newScheduledThreadPool(int corePoolSize)
    • 用于执行定时或延迟执行的任务,最典型的:异步操作时的超时回调

注意:对于定时任务的执行,在实际使用中,会去使用spring定时器,非常方便

 

3、newFixedThreadPool(int corePoolSize)

源代码:

  /**   * 1、创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池,   * 2、核心线程会一直运行   * 3、无界队列LinkedBlockingQueue   */  public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                   0L, TimeUnit.MILLISECONDS,                   new LinkedBlockingQueue<Runnable>());  }

View Code

说明:execute()的源代码查看第十三章 ThreadPoolExecutor源码解析

 

4、newSingleThreadExecutor()

源代码:

  /**   * 1、创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池   * 2、核心线程会一直运行   * 3、无界队列LinkedBlockingQueue   * 注意:所有task都是串行执行的   */  public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService      (new ThreadPoolExecutor(1, 1,                  0L, TimeUnit.MILLISECONDS,                  new LinkedBlockingQueue<Runnable>()));  }

View Code

说明:execute()的源代码查看第十三章 ThreadPoolExecutor源码解析

 

5、newCachedThreadPool()

源代码:

  /**   * 1、创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务   * 2、这种池将会在执行许多耗时短的异步任务的时候提高程序的性能。   * 3、6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源   * 4、队列:SynchronousQueue   * 5、maximumPoolSize为Integer.MAX_VALUE   */  public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                   60L, TimeUnit.SECONDS,                   new SynchronousQueue<Runnable>());  }

View Code

说明:execute()的源代码查看第十三章 ThreadPoolExecutor源码解析

 

6、newScheduledThreadPool(int corePoolSize)

源代码:

Executors:newScheduledThreadPool(int corePoolSize)

  /**   * 创建一个线程池:该线程池可以用于执行延时任务或者定时任务   */  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {    return new ScheduledThreadPoolExecutor(corePoolSize);  }

View Code

ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor(int corePoolSize)

  /**   * 创建一个线程池:   * corePoolSize==我们指定   * maximumPoolSize==Integer.MAX_VALUE   * keepAliveTime==0纳秒(即不回收闲置线程)   * 队列: DelayedWorkQueue   */  public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,       new DelayedWorkQueue());  }

View Code

说明:ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,其中调用的super构造器就是ThreadPoolExecutor的构造器。

ScheduledThreadPoolExecutor:execute(Runnable command)

  public void execute(Runnable command) {    if (command == null)      throw new NullPointerException();    schedule(command, 0, TimeUnit.NANOSECONDS);  }

View Code

ScheduledThreadPoolExecutor:schedule(Runnable command, long delay, TimeUnit unit)

  /**   * 这个方法:其实就是将task封装一下,然后加入到DelayedWorkQueue中   * 1、DelayedWorkQueue其实就是一个DelayQueue   * 2、当有新的task加入时,DelayQueue会将其加入内部的数组对象中,并对其进行排序,在这里,排序的规则就是执行的时间,执行时间越近的排在越前   * 3、线程池中的线程在执行task时,获取最近要执行的task,然后唤醒所有等待available条件的线程来执行该任务   */  public ScheduledFuture<?> schedule(Runnable command,                    long delay,                    TimeUnit unit) {    if (command == null || unit == null)      throw new NullPointerException();    RunnableScheduledFuture<?> t = decorateTask(command,                          new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));    delayedExecute(t);    return t;  }

View Code

注意:这里的注释就是整个ScheduledThreadPoolExecutor的执行机理。

 

下面说一下其中调用到的一些方法。

第一部分:封装ScheduledFutureTask任务

ScheduledThreadPoolExecutor:triggerTime(long delay, TimeUnit unit)

  /**   * 返回一个delayed action(延时任务)的触发时间   */  private long triggerTime(long delay, TimeUnit unit) {     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));  }  /**   * Returns the trigger time of a delayed action.   */  long triggerTime(long delay) {     return now() +       ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));  }

View Code

说明:用于计算延时任务的触发时间。

注意:在上边的execute()方法中传递的delay是0,根据上边的代码,计算出触发时间就是now()。

ScheduledThreadPoolExecutor:内部类ScheduledFutureTask

  private class ScheduledFutureTask<V>      extends FutureTask<V> implements RunnableScheduledFuture<V> {    private final long sequenceNumber;//用于打破FIFO关系的序列号    private long time;//任务执行的触发时间    /**     * 一个用于重复执行的任务的时间段(单位:纳秒)     * 0-->不重复执行的任务     * 正值:fixed-rate执行     * 负值:fixed-delay执行     */    private final long period;    /**     * 创建一个一次性的action并且指定触发时间     */    ScheduledFutureTask(Runnable r, V result, long ns) {      super(r, result);      this.time = ns;      this.period = 0;      this.sequenceNumber = sequencer.getAndIncrement();    }

View Code

说明:ScheduledFutureTask是FutureTask的子类,上边的构造器中的super(r, result)代码如下:

FutureTask:FutureTask(Runnable runnable, V result)

  private final Sync sync;//控制FutureTask的同步器  public FutureTask(Runnable runnable, V result) {    sync = new Sync(Executors.callable(runnable, result));  }

View Code

Executors:callable(Runnable task, T result)

  public static <T> Callable<T> callable(Runnable task, T result) {    if (task == null)      throw new NullPointerException();    return new RunnableAdapter<T>(task, result);  }

View Code

Executors:内部类RunnableAdapter

  static final class RunnableAdapter<T> implements Callable<T> {    final Runnable task;    final T result;    RunnableAdapter(Runnable task, T result) {      this.task = task;      this.result = result;    }    public T call() {      task.run();//这里是真正的task运行的地方      return result;    }  }

View Code

注意:这里才是task真正去运行的地方。-->task.run()

至此,ScheduledFutureTask任务封装完成。

 

第二部分:修饰任务

ScheduledThreadPoolExecutor:RunnableScheduledFuture

  protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,                              RunnableScheduledFuture<V> task) {    return task;  }

View Code

说明:这里其实就是直接返回了刚刚封装好的任务

 

第三部分:将延时任务加入阻塞队列

ScheduledThreadPoolExecutor:delayedExecute(Runnable command)

  private void delayedExecute(Runnable command) {    if (isShutdown()) {//return runState != RUNNING;线程池状态不是RUNNING      reject(command);//回绝任务      return;    }        if (getPoolSize() < getCorePoolSize())//当前线程池数量少于核心线程数      prestartCoreThread();//创建并启动一个核心线程    super.getQueue().add(command);//获取阻塞队列,并将command加入队列  }

View Code

说明:这样之后,之前封装好的任务就加入了延时队列DelayQueue(阻塞队列的一个子类)

DelayQueue:add(E e)

  public boolean add(E e) {    return offer(e);  }  public boolean offer(E e) {    final ReentrantLock lock = this.lock;    lock.lock();    try {      E first = q.peek();//获取队列头部节点但不删除      q.offer(e);//将e放到q的尾部      //如果队列中只有e或者e的触发时间小于队头结点      if (first == null || e.compareTo(first) < 0)        available.signalAll();      return true;    } finally {      lock.unlock();    }  }

View Code

说明:在该方法中,将上边封装好的任务就加入了DelayQueue,并将该任务置于了队头,然后唤醒所有等待available条件的线程来执行该任务。

 

总结:

  • 四种线程池最常用的就是newCachedThreadPool和newFixedThreadPool(int corePoolSize)
  • 对于newScheduledThreadPool(int corePoolSize)使用比较少,因为在现代开发中,如果用于去开发定时任务程序的话,用spring定时器会非常简单