星空网 > 软件开发 > Java

揭密FutureTask

      在java多线程编程中,我们经常使用线程池提交任务,并且通过Future来获取任务执行的结果,以此达到异步或者并行执行的效果。在jdk1.7以前,FutureTask是Future唯一的实现类,1.7后加入了ForkJoinTask类。本文主要总结一下我对FutureTask的理解。

Future类

  Future接口定义了5个方法,分别是 

 boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

    分别介绍一下这五个接口的用途:

  • boolean cancel(boolean mayInterruptInRunning) 取消一个正在执行中的任务,并且返回调用结果。如果取消成功则返回true,反之返回false。这里要注意,即使方法返回true,当前任务也未必真的被取消了,后面会介绍。
  • boolean isCancelled() 返回当前任务是否被取消。
  • Boolean isDone() 返回当前任务是否执行完毕。这里done的概念比较广,包括了futureTask被执行后的任意状态,例如正常执行完毕、执行异常或者任务被取消。
  • V get() 这个接口就是用来获取futureTask执行结果,调用这个接口时会被阻塞,直到拿到结果或者异常。
  • V get(long timeout, TimeUnit unit) 这个接口多了一个超时时间,如果过了这个时间task仍然没有结果返回,则抛出timeout异常

    写个demo便于理解

 1 public class FutureDemo { 2   public static void main(String[] args) { 3     ExecutorService executorService = Executors.newCachedThreadPool(); 4     Future future = executorService.submit(new Callable<Object>() { 5       @Override 6       public Object call() throws Exception { 7         Long start = System.currentTimeMillis(); 8         while (true) { 9           Long current = System.currentTimeMillis();10           if ((current - start) > 1000) {11             return 1;12           }13         }14       }15     });16 17     try {18       Integer result = (Integer)future.get();19       System.out.println(result);20     }catch (Exception e){21       e.printStackTrace();22     }23   }24 }

   这里我们模拟了1s钟的CPU空转,当执行future.get()的时候,主线程阻塞了大约一秒后,把结果打印出来:1。

        当然我们也可以使用V get(long timeout, TimeUnit unit),这个方法提供了一个超时时间的设置,如果超过当前时间任务线程还未返回,那么就会停止阻塞状态,并且抛出一个timeout异常。如下

1     try {2       Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);3       System.out.println(result);4     } catch (Exception e) {5       e.printStackTrace();6     }

        这里我们设置的超时时间是500毫秒,由于一开始我们模拟了1s的CPU计算时间,这里便会抛出超时异常,打印出堆栈信息

     揭密FutureTask

      当然,如果我们把超时时间设置的长一些,还是可以得到预期的结果的。

FutureTask内部实现机制

  刚我们测试了最常用的两个方法,接下来我们来探一探FutureTask的内部实现机制。首先我们看一下FutureTask的继承结构:

          揭密FutureTask

      FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable和Future,也就是说FutureTask既可以当做一个Runnable,也可以当做一个Future。

  FutureTask内部定义了7个状态,代表了FutureTask当前所处状态。如下

  private volatile int state;  private static final int NEW     = 0;  private static final int COMPLETING  = 1;  private static final int NORMAL    = 2;  private static final int EXCEPTIONAL = 3;  private static final int CANCELLED  = 4;  private static final int INTERRUPTING = 5;  private static final int INTERRUPTED = 6;

      当一个任务刚提交的时候,状态为NEW,由FutureTask的构造器可知:

public FutureTask(Callable<V> callable) {    if (callable == null)      throw new NullPointerException();    this.callable = callable;    this.state = NEW;    // ensure visibility of callable  }

  任务执行正常结束前,state会被设置成COMPLETING,代表任务即将完成,接下来很快就会被设置为NARMAL或者EXCEPTIONAL,这取决于调用Runnable中的call()方法是否抛出了异常。如果没有异常,则state设为NARMAL,反之为EXCEPTIONAL。

  如果任务提交后,在任务执行结束之前调用cancel(boolean mayInterruptIfRunning) 取消任务,那么有可能进入到后3个状态。如果传入的参数是false,state会被置为CANCELLED,反之如果传入true,state先被置为INTERRUPTING,后被置为INTERRUPTED。

     总结下,FutureTask的状态流转过程,可以出现以下三种状态:

        1. 正常执行完毕。 NEW -> COMPLETING -> NORMAL

    2. 执行中出现异常。NEW -> COMPLETING -> EXCEPTIONAL

        3. 任务执行过程中被取消,并且不响应中断。NEW -> CANCELLED

   4. 任务执行过程中被取消,并且响应中断。 NEW -> INTERRUPTING -> INTERRUPTED  

  那么以上状态为什么会这么流转呢?接下来我们一起扒一扒FutureTask的源码。我们从futureTask的方法看起。

1 public void run()

 1 public void run() { 2     if (state != NEW || 3       !UNSAFE.compareAndSwapObject(this, runnerOffset, 4                     null, Thread.currentThread())) 5       return; 6     try { 7       Callable<V> c = callable; 8       if (c != null && state == NEW) { 9         V result;10         boolean ran;11         try {12           result = c.call();13           ran = true;14         } catch (Throwable ex) {15           result = null;16           ran = false;17           setException(ex);18         }19         if (ran)20           set(result);21       }22     } finally {23       // runner must be non-null until state is settled to24       // prevent concurrent calls to run()25       runner = null;26       // state must be re-read after nulling runner to prevent27       // leaked interrupts28       int s = state;29       if (s >= INTERRUPTING)30         handlePossibleCancellationInterrupt(s);31     }32   }

  翻译一下,这个方法经历了以下几步

      1. 校验Task状态和当前线程引用runner,如果state不为NEW或者runner引用为null,直接返回。

  2. 调用runner的call()方法执行主逻辑,并且尝试获得返回值result。

  3. 如果抛出异常,调用setException(Throwable t)方法

  4. 如果没有异常,调用set(V v)方法

  5. 一些扫尾工作

 那么setException(Throwable t)和set(V v)做了什么呢?我们看一下源码

protected void set(V v) {    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {      outcome = v;      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state      finishCompletion();    }  }

  set(V v) 方法首先做一个CAS操作,将state字段由NEW->COMPLETING,这里的CAS操作读者可以自行百度原理。如果成功,那么把执行结果v赋给成员变量outcome,再把state的值设置为NORMAL,最后做一些清理工作,唤醒所有等待线程并把callable对象置为null。

 protected void setException(Throwable t) {    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {      outcome = t;      UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state      finishCompletion();    }  }

  同理,setException(Throwable t) 方法大同小异,只不过state字段流转为NEW->COMPLETING->EXCEPTION。同时把异常对象赋予v。

  这里我们就清楚了,当一个任务被提交后,状态流转中1、2是怎么来的了。同时我们可以确定,outcome变量,存着是执行结果或者抛出的异常对象。

2  public V get() throws InterruptionException,ExecutionException

    get() 和 get(long timeout, TimeUnit unit)方法是获取执行结果的两个方法,我们这里就看get()方法即可。首先贴源码

  

public V get() throws InterruptedException, ExecutionException {    int s = state;    if (s <= COMPLETING)      s = awaitDone(false, 0L);    return report(s);  }private V report(int s) throws ExecutionException {    Object x = outcome;    if (s == NORMAL)      return (V)x;    if (s >= CANCELLED)      throw new CancellationException();    throw new ExecutionException((Throwable)x);  }

  首先检查state值,如果小于COMPLETING,则阻塞,阻塞时可能会抛出异常,这里我们不纠结这个,往下看。如果没有抛出异常,获取执行后返回的state值,最后调用report(s)方法。接着我们看report方法,如果s为NORMAL,返回执行结果outcome,否则抛出异常。结合之前的run()方法,我们这里可以得出,如果主逻辑正常执行完毕,则返回执行结果,如果抛出异常,那么这里会封装该异常为ExecutionException并且抛出。如果任务执行过程中被取消了,则可能抛出CancellationException()。

3 public boolean cancel(boolean mayInterruptIfRunning)

  这个方法个人认为是最具争议的方法。这里我们先贴个demo

  

 1 public class FutureDemo { 2   public static void main(String[] args) { 3     ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 4     // 预创建线程 5     executorService.prestartCoreThread(); 6  7     Future future = executorService.submit(new Callable<Object>() { 8       @Override 9       public Object call() {10         System.out.println("start to run callable");11         Long start = System.currentTimeMillis();12         while (true) {13           Long current = System.currentTimeMillis();14           if ((current - start) > 1000) {15             System.out.println("当前任务执行已经超过1s");16             return 1;17           }18         }19       }20     });21 22     System.out.println(future.cancel(false));23 24     try {25       Thread.currentThread().sleep(3000);26       executorService.shutdown();27     } catch (Exception e) {28       //NO OP29     }30   }31 }

我们多次测试后发现,出现了2种打印结果,如图

揭密FutureTask

                结果1

揭密FutureTask

                结果2    

咦,两个结果和预期的都好像不太一样?第一种是任务压根没取消,第二种则是任务压根没提交成功,似乎和方法签名cancel不太一致?

我们先看一下方法签名上的作者注释

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
  这里我们可以看到,"尝试"取消任务的执行,如果当前任务已经结束或者已经取消,则当前取消操作会失败。如果任务还没开始就被取消,那么任务则不会被执行。
这里我们就知道了,如果任务还没开始执行时cancel(false)就被调用,那么这个任务是不会被执行的,这就解释了出现上图结果2的情况。那如果任务已经开始执行,并且
调用cancel(false),是不会终止任务的。我们还是从源码去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) {    if (state != NEW)      return false;    if (mayInterruptIfRunning) {      if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))        return false;      Thread t = runner;      if (t != null)        t.interrupt();      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state    }    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))      return false;    finishCompletion();    return true;  }

  执行逻辑如下

     1. 如果当前futureTask状态不为NEW,直接返回false,表示取消操作失败。

  2. 如果传入true,代表可能会引发线程中断。一个CAS操作,把状态由NEW->INTERRUPTING,如果执行失败则直接返回false。设置当前工作线程中断标识为true,然后把futureTask状态设置为INTERRUPTED。

  3. 如果传入false,把futureTask状态设置为CANCELLED。

  4. 做一些清理工作

    可见,cancel()方法仅仅是改变了futureTask的状态位!如果传入的是false,当前任务是不会被终止的,而是会继续执行,直到异常或者执行完毕。如果传入的是true,会调用当前线程的interrupt()方法,把中断标志位设为true。所以cancel()方法其实个人理解是有歧义的,它并不能真正取消一个任务的执行。事实上,除非线程自己停止自己的任务,或者退出JVM,是没有其他方法完全终止一个线程的任务的。cancel(true)方法也只是希望当前线程可以响应中断而已,当线程被阻塞,抛出InterruptedException。同时,由之前的future.get()方法可知,如果一个futureTask被cancel()了,调用get()方**抛出CancellationException。

总结

  理解FutureTask,我们使用Future类才能更加得心应手。这里也只是作者自己的理解,如有不对之处,还望读者批评指正。

 

 

作者:mayday芋头

出处:http://www.cnblogs.com/maypattis/
本博客中未标明转载的文章归作者mayday芋头和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利

   

 

 



原标题:揭密FutureTask

关键词:

*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。
相关文章
我的浏览记录
最新相关资讯
海外公司注册 | 跨境电商服务平台 | 深圳旅行社 | 东南亚物流