你的位置:首页 > Java教程

[Java教程]Java并发编程总结5——ThreadPoolExecutor


一、ThreadPoolExecutor介绍

在jdk1.8中,构造函数有4个。以

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)为例:

1、corePoolSize: 核心线程池大小

2、maximumPoolSize: 最大线程池大小

3、keepAliveTime: 当线程池中的线程数大于corePoolSize时, 多余空闲线程等待新任务的最长时间, 超过这个时间后多余线程终止

4、unit: 时间单位, 比如毫秒, 纳秒等

5、workQueue: 阻塞队列

6、threadFactory: 创建线程工厂, 可以方便的创建线程, 可以自定义; 默认为Executors.DefaultThreadFactory

7、handler: 饱和策略, 默认为AbortPolicy

 

定义一个完整的线程池可以这么写:

    private ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(10, 100, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

 

线程池的主要处理流程如下:

1、提交任务到线程池,判断核心线程池(corePoolSize)是否已满? 没满,创建工作线程执行任务;满了,进入下个流程。

2、线程池判断工作队列(workQueue)是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。

3、判断整个线程池(maximumPoolSize)是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略(RejectedExecutionHandler)来处理这个任务。

 

二、饱和策略 RejectedExecutionHandler

在ThreadPoolExecutor类中定义了4种RejectedExecutionHandler类型:

1、AbortPolicy: 默认策略,直接抛出RejectedExecutionException异常。

2、CallerRunsPolicy:只用调用者所在线程来运行任务。

3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

4、DiscardPolicy:不处理,丢弃掉。

也可以自定义饱和策略,比如将无法处理的新任务加入日志等,只需要实现RejectedExecutionHandler接口即可。

 

三、阻塞队列BlockingQueue

  /**   * ArrayBlockingQueue: 由数组结构组成的有界阻塞队列, 队列遵循FIFO   */  private BlockingQueue<String> abq = new ArrayBlockingQueue<String>(10);  /**   * LinkedBlockingQueue: 由链表结构组成的阻塞队列, FIFO   * LinkedBlockingDeque: 由链表结构组成的双向阻塞队列, 构造方法和LinkedBlockingQueue类似   * public LinkedBlockingQueue(int capacity)   //有界阻塞队列,队列最大值为capacity   * public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }    //无界阻塞队列,队列最大值为Integer.MAX_VALUE   * 通过Executors.newFixedThreadPool(int nThreads)创建的线程池中采用的是无界LinkedBlockingQueue   * 对于put和take操作, 内部采用了不同的锁: putLock, takeLock, 而ArrayBlockingQueue内部只有一把锁   */  private BlockingQueue<Thread> lbq = new LinkedBlockingQueue<Thread>(10);  private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);  private BlockingQueue<String> lbd = new LinkedBlockingDeque<String>();  /**   * PriorityBlockingQueue: 支持优先级排序的有界阻塞队列   * public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)   */  private BlockingQueue<String> pbq = new PriorityBlockingQueue(100, new Comparator<String>() {    public int compare(String o1, String o2) {      return o1.compareTo(o2);    //升序排列    }  });  /**   * DelayQueue: 一个使用优先级队列实现的无界阻塞队列, 支持延时获取元素, 适用于缓存系统的设计以及定时任务调度。   * 内部队列采用PriorityQueue, private final PriorityQueue<E> q = new PriorityQueue<E>();   * 队列元素需要实现Delayed接口, class DelayQueue<E extends Delayed> extends AbstractQueue<E>   */  /**   * SynchronousQueue: 不存储元素的阻塞队列   */  /**   * LinkedTransferQueue: 由链表结构组成的实现了TransferQueue接口的无界阻塞队列   * transfer方法: 如果当前消费者正在等待接收元素(take()或poll(long timeout, TimeUnit unit))方法, 生产者传入的元素可以直接传递给消费者, 而不放入队列   */  BlockingQueue<String> ltq = new LinkedTransferQueue<String>();

 

四、通过Executors创建线程池

    /**     * 创建单个线程, 适用于需要保证顺序执行任务的场景     * return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));     * BlockingQueue采用无界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义     */    Executors.newSingleThreadExecutor();
/** * 创建固定线程, 适用于需要限制当前线程数量, 负载比较重的服务器 * return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); * 采用无界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义 */ Executors.newFixedThreadPool(10);
/** * 根据需要创建线程, 适用于执行很多的短期异步任务的小程序, 或者负载较轻的服务器 * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); * 初始线程为0, 采用SynchronousQueue阻塞队列, 如果生产任务的速度低于消费的速度, 空闲60s的线程会被终止; 如果生产任务的速度持续高于消费速度, 则会不断创建新线程 */ Executors.newCachedThreadPool();
/** * 在给定的延迟之后运行任务, 或者定期执行任务 * super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); * 采用无界DelayQueue队列, 因此maximumPoolSize、keepAliveTime参数无意义 */ Executors.newScheduledThreadPool(10);