你的位置:首页 > Java教程

[Java教程]第十三章 ThreadPoolExecutor源码解析


ThreadPoolExecutor使用方式、工作机理以及参数的详细介绍,请参照《第十二章 ThreadPoolExecutor使用与工作机理 》

1、源代码主要掌握两个部分

  • 线程池的创建:构造器
  • 提交任务到线程池去执行:execute()

 

2、构造器

2.1、一些属性:

  /**   * runState provides the main lifecyle control, taking on values:   *   * RUNNING -> SHUTDOWN   *  On invocation of shutdown(), perhaps implicitly in finalize()   * (RUNNING or SHUTDOWN) -> STOP   *  On invocation of shutdownNow()   * SHUTDOWN -> TERMINATED   *  When both queue and pool are empty   * STOP -> TERMINATED   *  When pool is empty   */  volatile int runState;  static final int RUNNING  = 0;//接收新的任务,会处理队列中的任务  static final int SHUTDOWN  = 1;//不接收新的任务,但是会处理队列中的任务  static final int STOP    = 2;//不接收新的任务,也不会处理队列中的任务,而且还会中断正在执行的任务  static final int TERMINATED = 3;//STOP+中止所有线程  private final BlockingQueue<Runnable> workQueue;//队列  /**   * 对poolSize, corePoolSize, maximumPoolSize, runState, and workers set上锁   */  private final ReentrantLock mainLock = new ReentrantLock();  /**   * 支持awaitTermination的等待条件   */  private final Condition termination = mainLock.newCondition();  /**   * pool中的所有工作线程集合;仅仅在持有mainLock的时候才允许被访问   */  private final HashSet<Worker> workers = new HashSet<Worker>();  private volatile long keepAliveTime;  /**   * false(默认):当核心线程处于闲置状态时,也会存活   * true:核心线程使用keepAliveTime来决定自己的存活状态   */  private volatile boolean allowCoreThreadTimeOut;  /**   * Core pool size,仅仅在持有mainLock的时候才允许被更新,   * 因为是volatile允许并发读(即使是在更新的过程中)   */  private volatile int  corePoolSize;  /**   * Maximum pool size, 其他同上   */  private volatile int  maximumPoolSize;  /**   * Current pool size, 其他同上   */  private volatile int  poolSize;  /**   * 回绝处理器   */  private volatile RejectedExecutionHandler handler;  /**   * 所有的线程都通过这个线程工厂的addThread方法来创建。   */  private volatile ThreadFactory threadFactory;  /**   * Tracks largest attained pool size.   */  private int largestPoolSize;  /**   * 已经完成的任务数.仅仅在工作线程被终结的时候这个数字才会被更新   */  private long completedTaskCount;  /**   * 默认的回绝处理器(回绝任务并抛出异常)   */  private static final RejectedExecutionHandler defaultHandler =    new AbortPolicy();

View Code

说明:因为属性不多,这里列出了全部属性。

 

2.2、构造器:

  public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,       Executors.defaultThreadFactory(), defaultHandler);  }  public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               ThreadFactory threadFactory) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,       threadFactory, defaultHandler);  }  public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               RejectedExecutionHandler handler) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,       Executors.defaultThreadFactory(), handler);  }  public ThreadPoolExecutor(int corePoolSize,               int maximumPoolSize,               long keepAliveTime,               TimeUnit unit,               BlockingQueue<Runnable> workQueue,               ThreadFactory threadFactory,               RejectedExecutionHandler handler) {    /*     * 检查参数     */    if (corePoolSize < 0 ||      maximumPoolSize <= 0 ||      maximumPoolSize < corePoolSize ||      keepAliveTime < 0)      throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)      throw new NullPointerException();    /*     * 初始化值     */    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);//转成纳秒    this.threadFactory = threadFactory;    this.handler = handler;  }

View Code

说明:4个构造器(1个5参+2个6参+1个7参)

注意:默认情况下,构造器只会初始化参数,不会提前构建好线程

建议:构造器参数众多,建议使用构建器模式,关于构建器模式的实际使用范例,请参照《第二章 Google guava cache源码解析1--构建缓存器》

构造器中默认线程工厂的创建:Executors中的方法

  public static ThreadFactory defaultThreadFactory() {    return new DefaultThreadFactory();  }  /**   * 默认的线程工厂   */  static class DefaultThreadFactory implements ThreadFactory {    static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量    final ThreadGroup group;//线程组    final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量    final String namePrefix;    /*     * 创建默认的线程工厂     */    DefaultThreadFactory() {      SecurityManager s = System.getSecurityManager();      group = (s != null)? s.getThreadGroup() :                 Thread.currentThread().getThreadGroup();      namePrefix = "pool-" +             poolNumber.getAndIncrement() +             "-thread-";    }    /*     * 创建一个新的线程     */    public Thread newThread(Runnable r) {      Thread t = new Thread(group, r,                 namePrefix + threadNumber.getAndIncrement(),//新线程的名字                 0);      /*       * 将后台线程设置为应用线程       */      if (t.isDaemon())        t.setDaemon(false);      /*       * 将线程的优先级全部设置为NORM_PRIORITY       */      if (t.getPriority() != Thread.NORM_PRIORITY)        t.setPriority(Thread.NORM_PRIORITY);      return t;    }  }

View Code

说明,其中的newThread()方法会在第三部分用到。

 

3、提交任务的线程池去执行execute(Runnable command)

  public void execute(Runnable command) {    if (command == null)      throw new NullPointerException();    /**     * 这一块儿就是整个工作机理的部分(代码比较精致)     * 1、addIfUnderCorePoolSize     * 1)如果当前线程数poolSize<核心线程数corePoolSize并且pool的状态为RUNNING,     * 1.1)先获取锁     * 1.2)根据传入的任务firstTask创建一个Work对象,在该对象中编写了run()方法,在该run()方法中会真正的去执行firstTask的run()     * 说明:关于Work对象run部分的内容,查看Work内部类的run()方法上边的注释以及与其相关方法的注释     * 1.3)通过线程工厂与上边创建出来的work对象w创建新的线程t,将w加入工作线程集合,     * 然后启动线程t,之后就会自动执行w中的run(),w中的run()又会调用firstTask的run(),即处理真正的业务逻辑     *     * 2、如果poolSize>=corePoolSize或者上边的执行失败了     * 1)如果pool的状态处于RUNNING,将该任务入队(offer(command))     * 如果入队后,pool的状态不是RUNNING了或者池中的线程数为0了,下边的逻辑具体去查看注释     * 2)addIfUnderMaximumPoolSize(同addIfUnderCorePoolSize)     * 如果增加线程也不成功,则回绝任务。     *     */    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {      if (runState == RUNNING && workQueue.offer(command)) {        if (runState != RUNNING || poolSize == 0)          ensureQueuedTaskHandled(command);      }      else if (!addIfUnderMaximumPoolSize(command))        reject(command); // is shutdown or saturated    }  }

View Code

 

3.1、addIfUnderCorePoolSize(Runnable firstTask)

  /**   * 创建并且启动一个新的线程来处理任务   * 1、其第一个任务就是传入的firstTask参数   * 2、该方法仅仅用于当前线程数小于核心线程数并且pool没有被关掉的时候   */  private boolean addIfUnderCorePoolSize(Runnable firstTask) {    Thread t = null;    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();//获取锁    try {      if (poolSize < corePoolSize && runState == RUNNING)        t = addThread(firstTask);//创建新线程    } finally {      mainLock.unlock();//释放锁    }    return t != null;  }

View Code

addThread(Runnable firstTask)

  private Thread addThread(Runnable firstTask) {    Worker w = new Worker(firstTask);//构造一个work    Thread t = threadFactory.newThread(w);//创建线程    boolean workerStarted = false;    if (t != null) {//      if (t.isAlive()) //如果t线程已经启动了,而且还没有死亡        throw new IllegalThreadStateException();      w.thread = t;      workers.add(w);//将w工作线程加入workers线程池      int nt = ++poolSize;//当前的池数量+1      if (nt > largestPoolSize)        largestPoolSize = nt;      try {        t.start();//启动线程        workerStarted = true;      }      finally {        if (!workerStarted)//启动线程没有成功          workers.remove(w);//将w从workers集合中删除      }    }    return t;  }

View Code

newThread(Runnable r)

该方法在构建上边的默认线程工厂部分已经说过了。

 

Work内部类:

/**   * 工作线程。   */  private final class Worker implements Runnable {    /**     * 在每一个任务的执行前后都会获取和释放runLock。     * 该锁只要是为了防止中断正在执行任务的work线程     */    private final ReentrantLock runLock = new ReentrantLock();    /**     * Initial task to run before entering run loop.     * 1、Possibly null.     */    private Runnable firstTask;    /**     * 每个work线程完成的任务总量     * accumulated into completedTaskCount upon termination.     */    volatile long completedTasks;    Thread thread;    /**     * 该work中的线程是不是确实正在执行了run()     */    volatile boolean hasRun = false;    Worker(Runnable firstTask) {      this.firstTask = firstTask;    }    /*     * true:已经有线程持有了该锁     */    boolean isActive() {      return runLock.isLocked();    }    private void runTask(Runnable task) {      final ReentrantLock runLock = this.runLock;      runLock.lock();//获取锁runLock      try {        /*         * 如果pool状态为STOP或TERMINATED,确保线程被打断;         * 如果不是,确保线程不要被打断         */        if ((runState >= STOP ||          (Thread.interrupted() && runState >= STOP)) &&          hasRun)          thread.interrupt();        /*         * 确保afterExecute会被执行仅仅当任务完成了(try)或抛出了异常(catch)         */        boolean ran = false;        beforeExecute(thread, task);//执行任务的run()方法之前要执行的操作        try {          task.run();//执行线程的run()方法          ran = true;          afterExecute(task, null);//执行任务的run()方法之后要执行的操作          ++completedTasks;        } catch (RuntimeException ex) {          if (!ran)            afterExecute(task, ex);          throw ex;        }      } finally {        runLock.unlock();//释放锁runLock      }    }    /**     * Main run loop     * 运行当前任务task,运行结束后,尝试获取队列中的其他任务,     * 如果最后通过各种方式都获取不到,就回收该线程,如果获取到了,就用该线程继续执行接下来的任务     * 最后,当获取不到任何任务去执行时,就将该线程从works线程集合中删除掉     */    public void run() {      try {        hasRun = true;        Runnable task = firstTask;        firstTask = null;        while (task != null || (task = getTask()) != null) {          runTask(task);//运行该任务          task = null;        }      } finally {        workerDone(this);//将该线程从works集合中删除      }    }  }

View Code

说明:这里列出了该内部类的全部属性和常用方法。

 

getTask()

  /**   * 获取下一个worker线程将要运行的任务   * Gets the next task for a worker thread to run.    */  Runnable getTask() {    for (;;) {//无限循环      try {        int state = runState;        if (state > SHUTDOWN)          return null;        Runnable r;        if (state == SHUTDOWN) // Help drain queue          r = workQueue.poll();//处理queue中的任务        //下面的runState==RUNNING        else if (poolSize > corePoolSize || allowCoreThreadTimeOut)          //从队头获取任务,如果没有任务,等待keepAliveTime的时间          r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);        else          //从队头获取任务,如果没有任务,阻塞等待          r = workQueue.take();        if (r != null)          return r;        if (workerCanExit()) {//允许回收获取任务失败的线程          if (runState >= SHUTDOWN) // Wake up others            interruptIdleWorkers();//中断闲置的work线程          return null;        }        // Else retry      } catch (InterruptedException ie) {        // On interruption, re-check runState      }    }  }

View Code

workerCanExit()

  /**   * 检测一个获取任务失败的work线程是否可以退出了。   * 出现下面三种情况,work线程就会死亡。   * 1、如果pool的状态为STOP或TERMINATED   * 2、队列为空   * 3、允许回收核心线程并且池中的线程数大于1和corePoolSize的最大值   */  private boolean workerCanExit() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    boolean canExit;    try {      canExit = runState >= STOP ||        workQueue.isEmpty() ||        (allowCoreThreadTimeOut &&         poolSize > Math.max(1, corePoolSize));    } finally {      mainLock.unlock();    }    return canExit;  }

View Code

workerDone(Worker w)

  void workerDone(Worker w) {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {      completedTaskCount += w.completedTasks;      workers.remove(w);//从workers集合中删除该线程      if (--poolSize == 0)//如果池中的线程数为0        tryTerminate();    } finally {      mainLock.unlock();    }  }

View Code

 

3.2、ensureQueuedTaskHandled(Runnable command)

  /**   * 在一个task入队之后重新检查state。   * 当一个task入队后,pool的state发生了变化,该方法就会被调用。   * 如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝   * 否则该方法会保证至少有一个线程来处理入队的task   */  private void ensureQueuedTaskHandled(Runnable command) {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    boolean reject = false;    Thread t = null;    try {      int state = runState;      if (state != RUNNING && workQueue.remove(command))        reject = true;      else if (state < STOP &&           poolSize < Math.max(corePoolSize, 1) &&           !workQueue.isEmpty())        t = addThread(null);    } finally {      mainLock.unlock();    }    if (reject)      reject(command);  }

View Code

 

3.3、addIfUnderMaximumPoolSize(Runnable firstTask)

  private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {    Thread t = null;    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {      if (poolSize < maximumPoolSize && runState == RUNNING)        t = addThread(firstTask);    } finally {      mainLock.unlock();    }    return t != null;  }

View Code

说明:该方法的其他方法与addIfUnderCorePoolSize(Runnable firstTask)一样。

 

3.4、reject(Runnable command)

  void reject(Runnable command) {    handler.rejectedExecution(command, this);  }

View Code
  public static class AbortPolicy implements RejectedExecutionHandler {        public AbortPolicy() { }    /** 直接抛异常 */    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {      throw new RejectedExecutionException();    }  }

View Code

 

说明:明白了上一章将的线程池机理,按着这个机理去看源代码是非常容易的事情。

总结:

  • 上一章的工作机理
  • 上一章的参数详细说明