Java-多线程-CompletionService(优先处理)

x33g5p2x  于2022-08-17 转载在 Java  
字(3.0k)|赞(0)|评价(0)|浏览(621)

java.util.concurrent.CompletionService 是对 ExecutorService 的一个功能增强封装,优化了获取异步操作结果的接口。

假设我们要向线程池提交一批任务,并获取任务结果。一般的方式是提交任务后,从线程池得到一批 Future 对象集合,然后依次调用其 get() 方法。进行阻塞所有线程执行完毕,然后按线程执行的顺序获取到结果

这里有个问题:因为我们会要按固定的顺序来遍历 Future 元素,而 get() 方法又是阻塞的,因此如果某个 Future 对象执行时间太长,会使得我们的遍历过程阻塞在该元素上,无法及时从后面早已完成的 Future 当中取得结果。

CompletionService 解决了这个问题。下面介绍如何创建和使用 CompletionService。

CompletionService 本身不包含线程池,创建它的实例之前,先要创建一个 ExecutorService。下面是一个例子:

ExecutorService executor = Executors.newFixedThreadPool(4);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

向 CompletionService 提交任务的方式与 ExecutorService 一样:completionService.submit(() -> "Hello");

两者的区别在于取结果的方式。有了 CompletionService,你不需要再持有 Future 集合。如果要得到最早的执行结果,只需要像下面这样:
String result = completionService.take().get(); 这个 take() 方法返回的是最早完成的任务的结果,这个就解决了一个任务被另一个任务阻塞的问题。下面是一个封装好的例子:

//不用等待所有线程执行完毕,而是谁先执行完毕,就返回谁的结果 ,以此类推,等待全部线程执行完毕
    public  static void createCompletionServicesAll(String key, List<Callable> callables, Consumer< Future> consumer)  {
        CompletionService<?> completionService = new ExecutorCompletionService<>(getExecutor(key));
        for (Callable callable : callables) {
            completionService.submit(callable);
        }
        try {
            for (int i = 0; i < callables.size(); i++) {
                consumer.accept(completionService.take());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //一堆线程同时执行,谁先执行完毕那么就采用谁的结果,其余线程结果不管
    public  static Object createCompletionServicesOne(String key, List<Callable> callables) throws ExecutionException {
        CompletionService<?> completionService = new ExecutorCompletionService<>(getExecutor(key));
        for (Callable callable : callables) {
            completionService.submit(callable);
        }
        Object result=null;
        try {
            result=completionService.take().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return result;
    }

测试

@Test
    public  void show(){

        List<Callable> callables=new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            callables.add(()->{
//                throw  new Exception("---");
                int i1 = new Random().nextInt(100);
                Thread.sleep(i1 );
                System.out.println("睡眠:"+i1+"返回结果:"+finalI);
                return finalI;
            });
        }
//        Collections.shuffle(callables);

        ExecutorUtils.createCompletionServicesAll("test",callables,(o)->{
            try {
                System.out.println(o.get()); //如果某个线程出现异常则抛出异常我们这里可以捕捉到
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }

    @Test
    public  void show1(){

        List<Callable> callables=new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            callables.add(()->{
//                throw  new Exception("---");
                int i1 = new Random().nextInt(1000);
                Thread.sleep(i1);
                System.out.println("睡眠:"+i1+"返回结果:"+finalI);
                return finalI;
            });
        }

        try {
            //获取第一个执行完毕的结果
            Integer test = (Integer)ExecutorUtils.createCompletionServicesOne("test", callables);
            System.out.println("result:"+test);
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

点赞 -收藏-关注-便于以后复习和收到最新内容有其他问题在评论区讨论-或者私信我-收到会在第一时间回复在本博客学习的技术不得以任何方式直接或者间接的从事违反中华人民共和国法律,内容仅供学习、交流与参考免责声明:本文部分素材来源于网络,版权归原创者所有,如存在文章/图片/音视频等使用不当的情况,请随时私信联系我、以迅速采取适当措施,避免给双方造成不必要的经济损失。感谢,配合,希望我的努力对你有帮助^_^

相关文章