java CompletionService 介绍及使用
如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:CompletionService。
CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。
ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。
ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。
CompletionService中的Future是按照完成的顺序排列的,就是哪个线程先完成,CompletionService就先获取到哪个的返回结果。其实也可以不使用CompletionService,可以先创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据。考虑如下场景:多线程下载,结果用Future返回。第一个文件特别大,后面的文件很小。用CompletionService,能很快知道已经下载完文件的结果; 而用Future类型的集合,必须等第一个文件下载结束后,才会获得其它文件的下载结果。
下面是使用 CompletionService 和 Future类型集合的区别:
CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。
ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。
ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。
CompletionService中的Future是按照完成的顺序排列的,就是哪个线程先完成,CompletionService就先获取到哪个的返回结果。其实也可以不使用CompletionService,可以先创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据。考虑如下场景:多线程下载,结果用Future返回。第一个文件特别大,后面的文件很小。用CompletionService,能很快知道已经下载完文件的结果; 而用Future类型的集合,必须等第一个文件下载结束后,才会获得其它文件的下载结果。
下面是使用 CompletionService 和 Future类型集合的区别:
public class CallableAndFuture2 { private final int total = 5; public static void main(String[] args) { CallableAndFuture2 cf2 = new CallableAndFuture2(); cf2.method0(); cf2.method1(); } public void method0() { System.out.println("****************************************************************"); System.out.println("方法一执行结果:"); System.out.println("****************************************************************"); ExecutorService es = Executors.newCachedThreadPool(); CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es); for (int i = 0; i < total; i++) { final int taskId = i; cs.submit(new Callable<Integer>() { public Integer call() throws Exception { if (taskId == 3) { Thread.sleep(2000); } return taskId; } }); } es.shutdown(); // 可能做一些事情 for (int i = 0; i < total; i++) { try { System.out.println(cs.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } public void method1() { System.out.println("****************************************************************"); System.out.println("方法二执行结果:"); System.out.println("****************************************************************"); ExecutorService es = Executors.newCachedThreadPool(); List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); for (int i = 0; i < total; i++) { final int taskId = i; Future<Integer> future = es.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { if (taskId == 3) { Thread.sleep(2000); } return taskId; } }); futureList.add(future); } es.shutdown(); for (int i = 0; i < total; i++) { try { System.out.println(futureList.get(i).get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }执行的结果:
**************************************************************** 方法一执行结果: **************************************************************** 0 1 2 4 3 **************************************************************** 方法二执行结果: **************************************************************** 0 1 2 3 4方法一中3总是最后的,方法二中总是按顺序显示结果。
0
赞
- 所属分类: 后端技术
- 本文标签: