你的位置:首页 > Java教程

[Java教程]callable和CompletionService接口试用


CompletionService接口定义为Interface CompletionService<V>接口定它在java7中只有一个实现ExecutorCompletionService,这个接口内部集成了一个BlockingQueue,因此可以实现对多线程运行结果的收集工作。为了更好的测试该接口,我使用了两个测试,第一个测试是自己定义一个外部BlockingQueue来接收callable返回的数据。第二个测试是用CompletionService对executor进行装饰,使得返回的CompletionService对象能直接submit任务。

但是我发现它submit的后并没有马上调用executor的submit,而是对它进行了封装,因此出现了一点点延迟。如果在submit之后使用shutdown()命令结束的话,实际上该task可能还没有 放到executor的taskpool中。所以这一点值得注意。

import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingDeque;public class testCallable {  public static void main(String[] args) {    try {      futureCount();      completionServiceCount();    } catch (InterruptedException e) {      e.printStackTrace();    } catch (ExecutionException e) {      e.printStackTrace();    }  }  /**   * 使用自定义阻塞队列得到任务执行结果   *   * @throws InterruptedException   * @throws ExecutionException   */  public static void futureCount() throws InterruptedException,      ExecutionException {    BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>();    ExecutorService executorService = Executors.newCachedThreadPool();    int threadNum = 5;    for (int i = 0; i < threadNum; i++) {      Future<Integer> future = executorService.submit(getTask());      queue.put(future);    }    int sum = 0;    int temp = 0;    while(!queue.isEmpty()){      temp = queue.take().get();      sum += temp;      System.out.print(temp + "\t");    }    System.out.println("BlockingQueue all is : " + sum);    executorService.shutdown();  }  /**   * 使用completionService收集callable结果   * @throws ExecutionException   * @throws InterruptedException   */  public static void completionServiceCount() throws InterruptedException, ExecutionException {    ExecutorService executorService = Executors.newCachedThreadPool();    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(        executorService);    int threadNum = 5;    for (int i = 0; i < threadNum; i++) {      completionService.submit(getTask());    }    int sum = 0;    int temp = 0;    for(int i=0;i<threadNum;i++){      temp = completionService.take().get();      sum += temp;      System.out.print(temp + "\t");    }    System.out.println("CompletionService all is : " + sum);    executorService.shutdown();  }  public static Callable<Integer> getTask() {    final Random rand = new Random();    Callable<Integer> task = new Callable<Integer>() {      @Override      public Integer call() throws Exception {        int num = 0;        for (int i = 0; i < 10; i++) {          num = num + rand.nextInt(10);        }        return num;      }    };    return task;  }}