文章13 | 阅读 6313 | 点赞0
CompletableFuture
是java8新增的并发工具类,继承了FutureTask的同步任务的特点,同时新增了异步调用的特点(其中异步的方法名称都带有Async
),换而言之同步获取方法的返回值的方式可以用CompletableFuture
完成,与此同时,想要异步获取方法的返回值也可以使用CompletableFuture
来完成。
异步带Async
,并且底层执行的线程由ForkJoinPool
支持。于此同时还多了异常处理(执行任务的时候可能会发生异常,以前使用FutureTask的同步的方式是需要在执行的方法内处理异常的,而使用CompletableFuture
后则可以对异常捕捉,并且修改返回值,受java8的函数式编程的特点)
学习是一个不断改进的过程,看了大佬文章,然后自己跟踪源码过去后验证了文章中内容的正确性
掘金:CompletableFuture异步编程
CompletableFuture
在执行异步任务的时候默认采用的是ForkJoinPoin线程池
(前提是cpu是多核的,也就是Runtime.getRuntime().availableProcessors() - 1
,例如cpu是12核心的,那么就是返回11)
如果想不使用ForkJoinPool线程池
可以通过下面参数设置
-Djava.util.concurrent.ForkJoinPool.common.parallelism=1
为了验证,我通过VMware修改CPU核心数,经过实验线程数<=2都是不启用默认连接池的,也就是说如果服务器是单核或者双核的情况并且没有超线程,那么线程数是<=2的也就是说不会启用默认的连接池,而是采用直接创建线程并启动起来。
那么就会使USE_COMMON_POOL
的值变为false
,表示不使用默认的公共池
源码是这样的,通过判断上面这个值是否大于1来判断是否启用默认ForkJoinPool
池
如果不使用默认的公共池,则默认采用new Thread(任务).start();
方式执行任务。
因为在我debug过程中我发现ForkJoinPool.mode的值是11,而我的电脑上的实际线程数是12,也就是Runtime.getRuntime().availableProcessors()
返回的值是12。相当于进行了-1。而是否启用默认池则是通过ForkJoinPool.getCommonPoolParallelism() > 1
来判断的,那么2个核心数的服务器-1后,也是不大于1的所以应该也是不启用默认ForkJoinPool
池,所以进行了验证证明了我的结论。
博主我在看jdk源码的过程中发现,以及在学习 ForkJoinPool 的过程中了解到,这个是由DougLea大神的研究得出来得结论,也是经过实验过的结论,这篇关于ForkJoin框架的文章应该能在官网中能找到,另外ForkJoinPool的注释中也有说明,意思就是说ForkJoin采用先拆分后合并的思想,让一个大任务能快速的能计算完成。点击上面的ForkJoinPool可以链接到我之前做过的一次实验。如果是单核的情况下就不存在多任务并行执行,很明显采用ForkJoin反而大大降低了执行效率,因为要不断的切换cpu并且拆分也需要时间。
如何使用ForkJoin框架实际上是需要根据实际情况进行调整得,例如拆分任务的颗粒度,cpu核心线程等,只有在拆分所消耗的时间小于多线程并行计算所消耗的时间才使用ForkJoin,用的不好的话反而会使程序效率更低
CompletableFuture
不需要new
得到,起点是由CompletableFuture的public static
方法,当我们调用这些方法时底层就会自动创建一个CompletableFuture
实例,我们只需要把任务
以及自定义的线程
传入即可。所以一般的流程是
调用下面这些静态方法,然后进行链式调用
其它的一些public非静态
方法则是后续处理,例如一个任务可能失败,可能有返回值,可能需要和其它任务组合完成。这里就不贴了,自己看源码
如果想要写好高并发程序,利用CompletableFuture可以快速解决一些开发中常见的并发场景
CompletableFuture可以进行异步调用,类似于js中的Promise
对象
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中
CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
//返回一个数据
return 1;
},executorService).whenComplete((res, throwable) -> {
//完成异步时获取返回值
System.out.println("返回值是:"+res+",异常是:"+throwable);
}).exceptionally((throwable) -> {
//接收异常,修改返回值
System.out.println("获得到的异常是:"+throwable);
return 10;//返回一个值替代原来的返回值
});
handle方法
感知返回值,处理异常,并修改返回值上面的方式通过whenComplete方法
感知结果的产生,并且能接收异常,但是不能修改返回值,如果要修改返回值,就得和上面的案例一样结合exceptionally方法
才能达到效果。
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中
CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
//返回一个数据
return 1;
},executorService).handle((res,throwable)->{
if (res==1){
System.out.println("返回的结果是"+res);
return 1;
}
if (throwable!=null){
System.out.println("异常是"+throwable);
return 0;
}
return 100;
});
runAfterBoth
方法无法感知任务一
和任务二
是否完成ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
},executorService);
future1.runAfterBothAsync(future2,()->{
System.out.println("联合任务1和2");
},executorService);
使用thenAcceptBothAsync
会感知任务一和任务二都完成。
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池,一般放在静态成员变量中
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
},executorService);
future1.thenAcceptBothAsync(future2,(result1,result2)->{
System.out.println("感知任务一和任务二完成,执行任务三");
},executorService);
合并多个任务使用thenCombineAsync
在上面的结果,基础上,如果想要有返回值则使用它
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
});
future1.thenCombineAsync(future2,(res1,res2)->{
System.out.println("感知任务完成,合并两个任务,返回一个新的结果");
return res1+","+res2;
},executorService);
两个任务只要有一个完成就执行任务三的方法都是 XXXEither
都带有Either
runAfterEitherAsync
:不感知结果,并且无返回值ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
try {
TimeUnit.MILLISECONDS.sleep(200);//让任务二晚点执行完,等任务二完成,任务三就会开始
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
});
future1.runAfterEitherAsync(future2,()->{
System.out.println("任务三开始,任务一任务二只要有一个完成就会开始执行任务三");
},executorService);
acceptEitherAsync
:感知结果,接收返回值并消费掉,不产生返回值ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
});
future1.acceptEitherAsync(future2,(res)->{
System.out.println("感知结果执行任务三");//需要注意使用lambda表达式需要future1和future2返回值类型相同
},executorService);
applyToEitherAsync
:感知返回值,转换返回值得到一个新的结果ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
});
future1.applyToEitherAsync(future2,(res)->{
System.out.println("感知结果执行任务三");//需要注意使用lambda表达式需要future1和future2返回值类型相同
return "新的返回值"+res*2;
},executorService);
allOf
方法介绍,多任务都完成如果使用下面的代码等待多任务完成。
future1.get();
时主线程是堵塞的,因此future2.get();future3.get();
都不会执行需要等待future1先得到返回值,也就是说会加上多余的堵塞时间。虽然任务是异步的(的确任务已经完成了,但是线程没有交还线程池)而我们希望的是对于提前完成任务的线程将他交还给线程池,让它可以再次被其它任务执行
例如:future1 耗时4s,future2耗时3秒,future3耗时5s。如果是直接下面代码,则会导致future2
在3秒的时候就已经可以完成任务了,但是由于没有进行get,线程一直在等待返回数据,那么future2
的线程相当于被占用着。
future1.get();
future2.get();
future3.get();
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务三开始");
return 3;
}, executorService);
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
try {
allOf.get();//等待所有任务都完成
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
如果要获得每一个任务的返回结果还是需要使用future1.get();
,future2.get()
,future3.get()
得到返回结果。
anyOf
ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务一开始");
return 1;
}, executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务三开始");
return 3;
}, executorService);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
try {
anyOf.get();//获得完成的那个任务结果,其它任务的结果就获取不到,想要获取得调用各自得get
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://yumbo.blog.csdn.net/article/details/107926116
内容来源于网络,如有侵权,请联系作者删除!