java CompletionService 介绍及使用

  • 2016-04-15
  • 浏览 (81)

如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:CompletionService。

CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。

ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。

CompletionService中的Future是按照完成的顺序排列的,就是哪个线程先完成,CompletionService就先获取到哪个的返回结果。其实也可以不使用CompletionService,可以先创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据。考虑如下场景:多线程下载,结果用Future返回。第一个文件特别大,后面的文件很小。用CompletionService,能很快知道已经下载完文件的结果; 而用Future类型的集合,必须等第一个文件下载结束后,才会获得其它文件的下载结果。

下面是使用 CompletionService 和 Future类型集合的区别:
public class CallableAndFuture2 {
	private final int total = 5;

	public static void main(String[] args) {
		CallableAndFuture2 cf2 = new CallableAndFuture2();
		cf2.method0();
		cf2.method1();
	}

	public void method0() {
		System.out.println("****************************************************************");
		System.out.println("方法一执行结果:");
		System.out.println("****************************************************************");
		ExecutorService es = Executors.newCachedThreadPool();
		CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es);
		for (int i = 0; i < total; i++) {
			final int taskId = i;
			cs.submit(new Callable<Integer>() {
				public Integer call() throws Exception {
					if (taskId == 3) {
						Thread.sleep(2000);
					}
					return taskId;
				}
			});
		}

		es.shutdown();

		// 可能做一些事情

		for (int i = 0; i < total; i++) {
			try {
				System.out.println(cs.take().get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
	}

	public void method1() {
		System.out.println("****************************************************************");
		System.out.println("方法二执行结果:");
		System.out.println("****************************************************************");
		ExecutorService es = Executors.newCachedThreadPool();
		List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
		for (int i = 0; i < total; i++) {
			final int taskId = i;
			Future<Integer> future = es.submit(new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					if (taskId == 3) {
						Thread.sleep(2000);
					}
					return taskId;
				}
			});
			futureList.add(future);
		}

		es.shutdown();

		for (int i = 0; i < total; i++) {
			try {
				System.out.println(futureList.get(i).get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}

	}
}
执行的结果:
****************************************************************
方法一执行结果:
****************************************************************
0
1
2
4
3
****************************************************************
方法二执行结果:
****************************************************************
0
1
2
3
4
方法一中3总是最后的,方法二中总是按顺序显示结果。
正文到此结束