你的位置:首页 > Java教程

[Java教程]编写高质量代码:改善Java程序的151个建议(第8章:多线程和并发___建议126~128)


建议126:适时选择不同的线程池来实现

  Java的线程池实现从根本上来说只有两个:ThreadPoolExecutor类和ScheduledThreadPoolExecutor类,这两个类还是父子关系,但是Java为了简化并行计算,还提供了一个Exceutors的静态类,它可以直接生成多种不同的线程池执行器,比如单线程执行器、带缓冲功能的执行器等,但归根结底还是使用ThreadPoolExecutor类或ScheduledThreadPoolExecutor类的封装类。

  为了理解这些执行器,我们首先来看看ThreadPoolExecutor类,其中它复杂的构造函数可以很好的理解线程池的作用,代码如下:  

public class ThreadPoolExecutor extends AbstractExecutorService {  // 最完整的构造函数  public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,      long keepAliveTime, TimeUnit unit,      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,      RejectedExecutionHandler handler) {    // 检验输入条件    if (corePoolSize < 0 || maximumPoolSize <= 0        || maximumPoolSize < corePoolSize || keepAliveTime < 0)      throw new IllegalArgumentException();    // 检验运行环境    if (workQueue == null || threadFactory == null || handler == null)      throw new NullPointerException();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;  }}

  这是ThreadPoolExecutor最完整的构造函数,其他的构造函数都是引用该构造函数实现的,我们逐步来解释这些参数的含义。

  1. corePoolSize:最小线程数。线程启动后,在池中保持线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的,例如corePoolSize被设置为10,而任务数量为5,则线程池中最多会启动5个线程,而不是一次性的启动10个线程。
  2. maximumPoolSize:最大线程数量。这是池中最大能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler 拒绝策略处理。
  3. keepAliveTime:线程最大生命周期。这里的生命周期有两个约束条件,一是该参数针对的是超过corePoolSize数量的线程。二是处于非运行状态的线程。这么说吧,如果corePoolSize为10,maximumPoolSize为20,此时线程池中有15个线程正在运行,一段时间后,其中有3个线程处于等待状态的时间超过了keepAliveTime指定的时间,则结束这3个线程,此时线程池中还有12个线程正在运行。
  4. unit:时间单位。这是keepAliveTime的时间单位,可以是纳秒、毫秒、秒、分等选项。
  5. workQuene:任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。
  6. threadFactory:线程工厂。定义如何启动一个线程,可以设置线程名称,并且可以确认是否是后台线程等。
  7. handler:拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。

  线程池的管理是这样一个过程:首先创建线程池,然后根据任务的数量逐步将线程增大到corePoolSize数量,如果此时仍有任务增加,则放置到workQuene中,直到workQuene爆满为止,然后继续增加池中的数量(增强处理能力),最终达到maximumPoolSize,那如果此时还有任务增加进来呢?这就需要handler处理了,或者丢弃任务,或者拒绝新任务,或者挤占已有任务等。

  在任务队列和线程池都饱和的情况下,一但有线程处于等待(任务处理完毕,没有新任务增加)状态的时间超过keepAliveTime,则该线程终止,也就说池中的线程数量会逐渐降低,直至为corePoolSize数量为止。

  我们可以把线程池想象为这样一个场景:在一个生产线上,车间规定是可以有corePoolSize数量的工人,但是生产线刚建立时,工作不多,不需要那么多的人。随着工作数量的增加,工人数量也逐渐增加,直至增加到corePoolSize数量为止。此时还有任务增加怎么办呢?

  好办,任务排队,corePoolSize数量的工人不停歇的处理任务,新增加的任务按照一定的规则存放在仓库中(也就是我们的workQuene中),一旦任务增加的速度超过了工人处理的能力,也就是说仓库爆满时,车间就会继续招聘工人(也就是扩大线程数),直至工人数量到达maximumPoolSize为止,那如果所有的maximumPoolSize工人都在处理任务时,而且仓库也是饱和状态,新增任务该怎么处理呢?这就会扔一个叫handler的专门机构去处理了,它要么丢弃这些新增的任务,要么无视,要么替换掉别的任务。

  过了一段时间后,任务的数量逐渐减少,导致一部分工人处于待工状态,为了减少开支(Java是为了减少系统的资源消耗),于是开始辞退工人,直至保持corePoolSize数量的工人为止,此时即使没有工作,也不再辞退工人(池中的线程数量不再减少),这也是保证以后再有任务时能够快速的处理。

  明白了线程池的概念,我们再来看看Executors提供的几个线程创建线程池的便捷方法:

  • newSingleThreadExecutor:单线程池。顾名思义就是一个池中只有一个线程在运行,该线程永不超时,而且由于是一个线程,当有多个任务需要处理时,会将它们放置到一个无界阻塞队列中逐个处理,它的实现代码如下:  
 public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService      (new ThreadPoolExecutor(1, 1,                  0L, TimeUnit.MILLISECONDS,                  new LinkedBlockingQueue<Runnable>()));  }

  它的使用方法也很简单,下面是简单的示例:

public static void main(String[] args) throws ExecutionException,      InterruptedException {    // 创建单线程执行器    ExecutorService es = Executors.newSingleThreadExecutor();    // 执行一个任务    Future<String> future = es.submit(new Callable<String>() {      @Override      public String call() throws Exception {        return "";      }    });    // 获得任务执行后的返回值    System.out.println("返回值:" + future.get());    // 关闭执行器    es.shutdown();  }

  • newCachedThreadPool:缓冲功能的线程。建立了一个线程池,而且线程数量是没有限制的(当然,不能超过Integer的最大值),新增一个任务即有一个线程处理,或者复用之前空闲的线程,或者重亲启动一个线程,但是一旦一个线程在60秒内一直处于等待状态时(也就是一分钟无事可做),则会被终止,其源码如下: 
 public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                   60L, TimeUnit.SECONDS,                   new SynchronousQueue<Runnable>());  }

  这里需要说明的是,任务队列使用了同步阻塞队列,这意味着向队列中加入一个元素,即可唤醒一个线程(新创建的线程或复用空闲线程来处理),这种队列已经没有队列深度的概念了.

  • newFixedThreadPool:固定线程数量的线程池。 在初始化时已经决定了线程的最大数量,若任务添加的能力超出了线程的处理能力,则建立阻塞队列容纳多余的任务,其源码如下: 
public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                   0L, TimeUnit.MILLISECONDS,                   new LinkedBlockingQueue<Runnable>());  }

  上面返回的是一个ThreadPoolExecutor,它的corePoolSize和maximumPoolSize是相等的,也就是说,最大线程数量为nThreads。如果任务增长的速度非常快,超过了LinkedBlockingQuene的最大容量(Integer的最大值),那此时会如何处理呢?会按照ThreadPoolExecutor默认的拒绝策略(默认是DiscardPolicy,直接丢弃)来处理。

  以上三种线程池执行器都是ThreadPoolExecutor的简化版,目的是帮助开发人员屏蔽过得线程细节,简化多线程开发。当需要运行异步任务时,可以直接通过Executors获得一个线程池,然后运行任务,不需要关注ThreadPoolExecutor的一系列参数是什么含义。当然,有时候这三个线程不能满足要求,此时则可以直接操作ThreadPoolExecutor来实现复杂的多线程计算。可以这样比喻,newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是线程池的简化版,而ThreadPoolExecutor则是旗舰版___简化版容易操作,需要了解的知识相对少些,方便使用,而旗舰版功能齐全,适用面广,难以驾驭。

建议127:Lock与synchronized是不一样的

  很多编码者都会说,Lock类和synchronized关键字用在代码块的并发性和内存上时语义是一样的,都是保持代码块同时只有一个线程执行权。这样的说法只说对了一半,我们以一个任务提交给多个线程为例,来看看使用显示锁(Lock类)和内部锁(synchronized关键字)有什么不同,首先定义一个任务:

class Task {  public void doSomething() {    try {      // 每个线程等待2秒钟,注意此时线程的状态转变为Warning状态      Thread.sleep(2000);    } catch (Exception e) {      // 异常处理    }    StringBuffer sb = new StringBuffer();    // 线程名称    sb.append("线程名称:" + Thread.currentThread().getName());    // 运行时间戳    sb.append(",执行时间: " + Calendar.getInstance().get(Calendar.SECOND) + "s");    System.out.println(sb);  }}

  该类模拟了一个执行时间比较长的计算,注意这里是模拟方式,在使用sleep方法时线程的状态会从运行状态转变为等待状态。该任务具备多线程能力时必须实现Runnable接口,我们分别建立两种不同的实现机制,先看显示锁实现:  

class TaskWithLock extends Task implements Runnable {  // 声明显示锁  private final Lock lock = new ReentrantLock();  @Override  public void run() {    try {      // 开始锁定      lock.lock();      doSomething();    } finally {      // 释放锁      lock.unlock();    }  }}

  这里有一点需要说明,显示锁的锁定和释放必须放在一个try......finally块中,这是为了确保即使出现异常也能正常释放锁,保证其它线程能顺利执行。

  内部锁的处理也非常简单,代码如下: 

//内部锁任务class TaskWithSync extends Task implements Runnable{  @Override  public void run() {    //内部锁    synchronized("A"){      doSomething();    }  }  }

  这两个任务看着非常相似,应该能够产生相同的结果吧?我们建立一个模拟场景,保证同时有三个线程在运行,代码如下: 

public class Client127 {  public static void main(String[] args) throws Exception {    // 运行显示任务    runTasks(TaskWithLock.class);    // 运行内部锁任务    runTasks(TaskWithSync.class);  }  public static void runTasks(Class<? extends Runnable> clz) throws Exception {    ExecutorService es = Executors.newCachedThreadPool();    System.out.println("***开始执行 " + clz.getSimpleName() + " 任务***");    // 启动3个线程    for (int i = 0; i < 3; i++) {      es.submit(clz.newInstance());    }    // 等待足够长的时间,然后关闭执行器    TimeUnit.SECONDS.sleep(10);    System.out.println("---" + clz.getSimpleName() + " 任务执行完毕---\n");    // 关闭执行器    es.shutdown();  }}

  按照一般的理解,Lock和synchronized的处理方式是相同的,输出应该没有差别,但是很遗憾的是,输出差别其实很大。输出如下:

        ***开始执行 TaskWithLock 任务***
          线程名称:pool-1-thread-2,执行时间: 55s
          线程名称:pool-1-thread-1,执行时间: 55s
          线程名称:pool-1-thread-3,执行时间: 55s
        ---TaskWithLock  任务执行完毕---

        ***开始执行 TaskWithSync 任务***
          线程名称:pool-2-thread-1,执行时间: 5s
          线程名称:pool-2-thread-3,执行时间: 7s
          线程名称:pool-2-thread-2,执行时间: 9s
        ---TaskWithSync  任务执行完毕---

   注意看运行的时间戳,显示锁是同时运行的,很显然pool-1-thread-1线程执行到sleep时,其它两个线程也会运行到这里,一起等待,然后一起输出,这还具有线程互斥的概念吗?

  而内部锁的输出则是我们预期的结果,pool-2-thread-1线程在运行时其它线程处于等待状态,pool-2-threda-1执行完毕后,JVM从等待线程池中随机获的一个线程pool-2-thread-3执行,最后执行pool-2-thread-2,这正是我们希望的。

  现在问题来了:Lock锁为什么不出现互斥情况呢?

  这是因为对于同步资源来说(示例中的代码块)显示锁是对象级别的锁,而内部锁是类级别的锁,也就说说Lock锁是跟随对象的,synchronized锁是跟随类的,更简单的说把Lock定义为多线程类的私有属性是起不到资源互斥作用的,除非是把Lock定义为所有线程的共享变量。都说代码是最好的解释语言,我们来看一个Lock锁资源的代码:  

public static void main(String[] args) {    // 多个线程共享锁    final Lock lock = new ReentrantLock();    // 启动三个线程    for (int i = 0; i < 3; i++) {      new Thread(new Runnable() {        @Override        public void run() {          try {            lock.lock();            // 休眠2秒钟            Thread.sleep(2000);            System.out.println(Thread.currentThread().getName());          } catch (InterruptedException e) {            e.printStackTrace();          } finally {            lock.unlock();          }        }      }).start();    }  }

  执行时,会发现线程名称Thread-0、Thread-1、Thread-2会逐渐输出,也就是一个线程在执行时,其它线程就处于等待状态。注意,这里三个线程运行的实例对象是同一个类。

  除了这一点不同之外,显示锁和内部锁还有什么区别呢?还有以下4点不同:

  1. Lock支持更细精度的锁控制:假设读写锁分离,写操作时不允许有读写操作存在,而读操作时读写可以并发执行,这一点内部锁就很难实现。显示锁的示例代码如下:  
class Foo {  // 可重入的读写锁  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  // 读锁  private final Lock r = rwl.readLock();  // 写锁  private final Lock w = rwl.writeLock();  // 多操作,可并发执行  public void read() {    try {      r.lock();      Thread.sleep(1000);      System.out.println("read......");    } catch (InterruptedException e) {      e.printStackTrace();    } finally {      r.unlock();    }  }  // 写操作,同时只允许一个写操作  public void write() {    try {      w.lock();      Thread.sleep(1000);      System.out.println("write.....");    } catch (InterruptedException e) {      e.printStackTrace();    } finally {      w.unlock();    }  }}

  可以编写一个Runnable实现类,把Foo类作为资源进行调用(注意多线程是共享这个资源的),然后就会发现这样的现象:读写锁允许同时有多个读操作但只允许一个写操作,也就是当有一个写线程在执行时,所有的读线程都会阻塞,直到写线程释放锁资源为止,而读锁则可以有多个线程同时执行。

 2.Lock锁是无阻塞锁,synchronized是阻塞锁

    当线程A持有锁时,线程B也期望获得锁,此时,如果程序中使用的显示锁,则B线程为等待状态(在通常的描述中,也认为此线程被阻塞了),若使用的是内部锁则为阻塞状态。

 3.Lock可实现公平锁,synchronized只能是非公平锁

  什么叫非公平锁呢?当一个线程A持有锁,而线程B、C处于阻塞(或等待)状态时,若线程A释放锁,JVM将从线程B、C中随机选择一个持有锁并使其获得执行权,这叫非公平锁(因为它抛弃了先来后到的顺序);若JVM选择了等待时间最长的一个线程持有锁,则为公平锁(保证每个线程的等待时间均衡)。需要注意的是,即使是公平锁,JVM也无法准确做到" 公平 ",在程序中不能以此作为精确计算。

  显示锁默认是非公平锁,但可以在构造函数中加入参数为true来声明出公平锁,而synchronized实现的是非公平锁,他不能实现公平锁。

 4.Lock是代码级的,synchronized是JVM级的

    Lock是通过编码实现的,synchronized是在运行期由JVM释放的,相对来说synchronized的优化可能性高,毕竟是在最核心的部分支持的,Lock的优化需要用户自行考虑。

    显示锁和内部锁的功能各不相同,在性能上也稍有差别,但随着JDK的不断推进,相对来说,显示锁使用起来更加便利和强大,在实际开发中选择哪种类型的锁就需要根据实际情况考虑了:灵活、强大选择lock,快捷、安全选择synchronized.

建议128:预防线程死锁

 线程死锁(DeadLock)是多线程编码中最头疼的问题,也是最难重现的问题,因为Java是单进程的多线程语言,一旦线程死锁,则很难通过外科手术的方法使其起死回生,很多时候只有借助外部进程重启应用才能解决问题,我们看看下面的多线程代码是否会产生死锁:

class Foo implements Runnable {  @Override  public void run() {   fun(10);  }  // 递归方法  public synchronized void fun(int i) {    if (--i > 0) {      for (int j = 0; j < i; j++) {        System.out.print("*");      }      System.out.println(i);      fun(i);    }  }}

  注意fun方法是一个递归函数,而且还加上了synchronized关键字,它保证同时只有一个线程能够执行,想想synchronized关键字的作用:当一个带有synchronized关键字的方法在执行时,其他synchronized方法会被阻塞,因为线程持有该对象的锁,比如有这样的代码:  

class Foo1 {  public synchronized void m1() {    try {      Thread.sleep(1000);    } catch (InterruptedException e) {      // 异常处理    }    System.out.println("m1方法执行完毕");  }  public synchronized void m2() {    System.out.println("m2方法执行完毕");  }}

  相信大家都明白,先输出"m1执行完毕",然后再输出"m2"执行完毕,因为m1方法在执行时,线程t持有foo对象的锁,要想主线程获得m2方法的执行权限就必须等待m1方法执行完毕,也就是释放当前锁。明白了这个问题,我们思考一下上例中带有synchronized的递归方法是否能执行?会不会产生死锁?运行结果如下:

  *********9
  ********8
  *******7
  ******6
  *****5
  ****4
  ***3
  **2
  *1

 一个倒三角形,没有产生死锁,正常执行,这是为何呢?很奇怪,是吗?那是因为在运行时当前线程(Thread-0)获得了Foo对象的锁(synchronized虽然是标注在方法上的,但实际作用是整个对象),也就是该线程持有了foo对象的锁,所以它可以多次重如fun方法,也就是递归了。可以这样来思考该问题,一个包厢有N把钥匙,分别由N个海盗持有 (也就是我们Java的线程了),但是同一时间只能由一把钥匙打开宝箱,获取宝物,只有在上一个海盗关闭了包厢(释放锁)后,其它海盗才能继续打开获取宝物,这里还有一个规则:一旦一个海盗打开了宝箱,则该宝箱内的所有宝物对他来说都是开放的,即使是“ 宝箱中的宝箱”(即内箱)对他也是开放的。可以用如下代码来表示:  

class Foo2 implements Runnable{  @Override  public void run() {    method1();  }  public synchronized void method1(){    method2();  }  public synchronized void method2(){    //doSomething  }}

  方法method1是synchronized修饰的,方法method2也是synchronized修饰的,method1和method2方法重入完全是可行的,此种情况下会不会产生死锁。

  那什么情况下回产生死锁呢?看如下代码: 

class A {  public synchronized void a1(B b) {    String name = Thread.currentThread().getName();    System.out.println(name + " 进入A.a1()");    try {      // 休眠一秒 仍持有锁      Thread.sleep(1000);    } catch (Exception e) {      // 异常处理    }    System.out.println(name + " 试图访问B.b2()");    b.b2();  }  public synchronized void a2() {    System.out.println("进入a.a2()");  }}class B {  public synchronized void b1(A a) {    String name = Thread.currentThread().getName();    System.out.println(name + " 进入B.b1()");    try {      // 休眠一秒 仍持有锁      Thread.sleep(1000);    } catch (Exception e) {      // 异常处理    }    System.out.println(name + " 试图访问A.a2()");    a.a2();  }  public synchronized void b2() {    System.out.println("进入B.b2()");  }}

public static void main(String[] args) throws InterruptedException {    final A a = new A();    final B b = new B();    // 线程A    new Thread(new Runnable() {      @Override      public void run() {        a.a1(b);      }    }, "线程A").start();    // 线程B    new Thread(new Runnable() {      @Override      public void run() {        b.b1(a);      }    }, "线程B").start();  }

  此段程序定义了两个资源A和B,然后在两个线程A、B中使用了该资源,由于两个资源之间交互操作,并且都是同步方法,因此在线程A休眠一秒钟后,它会试图访问资源B的b2方法。但是B线程持有该类的锁,并同时在等待A线程释放其锁资源,所以此时就出现了两个线程在互相等待释放资源的情况,也就是死锁了,运行结果如下:

   线程A  进入A.a1()
      线程B  进入B.b1()
  线程A  试图访问B.b2()
  线程B  试图访问A.a2()

  此种情况下,线程A和线程B会一直等下去,直到有外界干扰为止,比如终止一个线程,或者某一线程自行放弃资源的争抢,否则这两个线程就始终处于死锁状态了。我们知道达到线程死锁需要四个条件:

  1. 互斥条件:一个资源每次只能被一个线程使用
  2. 资源独占条件:一个线程因请求资源在未使用完之前,不能强行剥夺
  3. 不剥夺条件:线程已经获得的资源在未使用完之前,不能强行剥夺
  4. 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系

  只有满足了这些条件才能产生线程死锁,这也同时告诫我们如果要解决线程死锁问题,就必须从这四个条件入手,一般情况下可以按照以下两种方案解决:

  (1)、避免或减少资源共享

    一个资源被多个线程共享,若采用了同步机制,则产生死锁的可能性大,特别是在项目比较庞大的情况下,很难杜绝死锁,对此最好的解决办法就是减少资源共享。

    例如一个B/S结构的办公系统可以完全忽略资源共享,这是因为此类系统有三个特征:一是并发访问不会太高,二是读操作多于写操作,三是数据质量要求比较低,因此即使出现数据资源不同步的情况也不可能产生太大影响,完全可以不使用同步技术。但是如果是一个支付清算系统就必须慎重考虑资源同步问题了,因为此系统一是数据质量要求非常高(如果产生数据不同步的情况那可是重大生产事故),二是并发量大,不设置数据同步则会产生非常多的运算逻辑失效的情况,这会导致交易失败,产生大量的"脏数据",系统可靠性大大降低。

  (2)、使用自旋锁

    回到前面的例子,线程A在等待线程B释放资源,而线程B又在等待线程A释放资源,僵持不下,那如果线程B设置了超时时间是不是就可以解决该死锁问题了呢?比如线程B在等待2秒后还是无法获得资源,则自行终结该任务,代码如下:   

public void b2() {    try {      // 立刻获得锁,或者2秒等待锁资源      if (lock.tryLock(2, TimeUnit.SECONDS)) {        System.out.println("进入B.b2()");      }    } catch (InterruptedException e) {      e.printStackTrace();    } finally {      lock.unlock();    }  }

  上面的代码中使用tryLock实现了自旋锁(Spin Lock),它跟互斥锁一样,如果一个执行单元要想访问被自旋锁保护的共享资源,则必须先得到锁,在访问完共享资源后,也必须释放锁。如果在获取自旋锁时,没有任何执行单元保持该锁,那么将立即得到锁;如果在获取自旋锁时已经有保持者,那么获取锁操作将"自旋" 在哪里,直到该自旋锁的保持者释放了锁为止,在我们的例子中就是线程A等待线程B释放锁,在2秒内  不断尝试是否能够获得锁,达到2秒后还未获得锁资源,线程A则结束运行,线程B将获得资源继续执行,死锁解除。

  对于死锁的描述最经典的案例是哲学家进餐(五位哲学家围坐在圆形餐桌旁,人手一根筷子,做一下两件事情:吃饭和思考。要求吃东西的时候停止思考,思考的时候停止吃东西,而且必须使用两根筷子才能吃东西),解决此问题的方法很多,比如引入服务生(资源地调度)、资源分级等方法都可以很好的解决此类死锁问题。在我们Java多线程并发编程中,死锁很难避免,也不容易预防,对付它的最好方法就是测试:提高测试覆盖率,建立有效的边界测试,加强资源监控,这些方法能使得死锁无可遁形,即使发生了死锁现象也能迅速查到原因,提高系统性能。