前言
在使用CompletableFuture之前,我一直在使用Future处理多线程的一些业务场景。在查jdk一些api时发现了了CompletableFuture。
其中,它的前言部分就吸引到了我:
A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion. When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.
Completable是可以明确的完成(设置其值和状态)并可以用作CompletionStage的Future,支持函数调用,以及触发线程完成后的操作。
首先对比 Future,CompletableFuture 优点在于:
- 不需要手工分配线程,JDK 自动分配
- 代码语义清晰,异步任务链式调用
- 支持编排异步任务
CompletableFuture api图关系:
1.创建CompletableFuture
public static <U> CompletableFuture<U> completedFuture(U value)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
第一个方法创建一个具有默认结果的 CompletableFuture,这个没啥好讲。我们重点讲述下下面四个异步方法。
前两个方法 runAsync 不支持返回值,而 supplyAsync可以支持返回结果。
这个两个方法默认将会使用公共的 ForkJoinPool 线程池执行,这个线程池默认线程数是 CPU 的核数。
可以设置 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数
使用共享线程池将会有个弊端,一旦有任务被阻塞,将会造成其他任务没机会执行。所以强烈建议使用后两个方法,根据任务类型不同,主动创建线程池,进行资源隔离,避免互相干扰。
2.设置任务结果
CompletableFuture 提供以下方法,可以主动设置任务结果。
boolean complete(T value)
boolean completeExceptionally(Throwable ex)
第一个方法,主动设置 CompletableFuture 任务执行结果,若返回 true,表示设置成功。如果返回 false,设置失败,这是因为任务已经执行结束,已经有了执行结果。
示例代码如下:
// 执行异步任务
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("cf 任务执行开始");
sleep(10, TimeUnit.SECONDS);
System.out.println("cf 任务执行结束");
return "楼下小姐姐";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
sleep(5, TimeUnit.SECONDS);
System.out.println("主动设置 cf 任务结果");
// 设置任务结果,由于 cf 任务未执行结束,结果返回 true
cf.complete("mavic");
});
// 由于 cf 未执行结束,将会被阻塞。5 秒后,另外一个线程主动设置任务结果
System.out.println("get:" + cf.get());
// 等待 cf 任务执行结束
sleep(10, TimeUnit.SECONDS);
// 由于已经设置任务结果,cf 执行结束任务结果将会被抛弃
System.out.println("get:" + cf.get());
/***
* cf 任务执行开始
* 主动设置 cf 任务结果
* get:mavic
* cf 任务执行结束
* get:mavic
*/
这里需要注意一点,一旦 complete 设置成功,CompletableFuture 返回结果就不会被更改,即使后续 CompletableFuture 任务执行结束。
第二个方法,给 CompletableFuture 设置异常对象。若设置成功,如果调用 get 等方法获取结果,将会抛错。
示例代码如下:
// 执行异步任务
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("cf 任务执行开始");
sleep(10, TimeUnit.SECONDS);
System.out.println("cf 任务执行结束");
return "楼下小姐姐";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
sleep(5, TimeUnit.SECONDS);
System.out.println("主动设置 cf 异常");
// 设置任务结果,由于 cf 任务未执行结束,结果返回 true
cf.completeExceptionally(new RuntimeException("啊,挂了"));
});
// 由于 cf 未执行结束,前 5 秒将会被阻塞。后续程序抛出异常,结束
System.out.println("get:" + cf.get());
/***
* cf 任务执行开始
* 主动设置 cf 异常
* java.util.concurrent.ExecutionException: java.lang.RuntimeException: 啊,挂了
* ......
*/
3.CompletionStage
CompletableFuture 分别实现两个接口 Future与 CompletionStage。
Future 接口都比较熟悉,这里主要讲讲 CompletionStage。
CompletableFuture 大部分方法来自CompletionStage 接口,正是因为这个接口,CompletableFuture才有如此强大功能。
想要理解 CompletionStage 接口,我们需要先了解任务的时序关系的。我们可以将任务时序关系分为以下几种:
- 串行执行关系
- 并行执行关系
- AND 汇聚关系
- OR 汇聚关系
4.串行执行关系
任务串行执行,下一个任务必须等待上一个任务完成才可以继续执行。
CompletionStage 有四组接口可以描述串行这种关系,分别为:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenRun(Runnable action)
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
thenApply 方法需要传入核心参数为 Function<T,R>类型。这个类核心方法为:
R apply(T t)
所以这个接口将会把上一个任务返回结果当做入参,执行结束将会返回结果。
thenAccept 方法需要传入参数对象为 Consumer类型,这个类核心方法为:
void accept(T t)
返回值 void 可以看出,这个方法不支持返回结果,但是需要将上一个任务执行结果当做参数传入。
thenRun 方法需要传入参数对象为 Runnable 类型,这个类比较熟悉,核心方法既不支持传入参数,也不会返回执行结果。
thenCompose 方法作用与 thenApply 一样,只不过 thenCompose 需要返回新的 CompletionStage。这么理解比较抽象,可以集合代码一起理解。
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello, 楼下小姐姐");
cf.thenApply(String::toLowerCase);
cf.thenCompose(s -> CompletableFuture.supplyAsync(s::toLowerCase));
方法中带有 Async ,代表可以异步执行,这个系列还有重载方法,可以传入自定义的线程池,上图未展示,读者只可以自行查看 API。
最后我们通过代码展示 thenApply 使用方式:
CompletableFuture<String> cf
= CompletableFuture.supplyAsync(() -> "hello,楼下小姐姐")// 1
.thenApply(s -> s + "@mavic") // 2
.thenApply(String::toUpperCase); // 3
System.out.println(cf.join());
// 输出结果 HELLO,楼下小姐姐@mavic
这段代码比较简单,首先我们开启一个异步任务,接着串行执行后续两个任务。任务 2 需要等待任务1 执行完成,任务 3 需要等待任务 2。
上面方法,需要记住 Function<T,R>,Consumer,Runnable 三者区别,根据场景选择使用。
5.AND 汇聚关系
AND 汇聚关系代表所有任务完成之后,才能进行下一个任务。
如上所示,只有任务 A 与任务 B 都完成之后,任务 C 才会开始执行。
CompletionStage 有以下接口描述这种关系。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
thenCombine 方法核心参数 BiFunction ,作用与 Function一样,只不过 BiFunction 可以接受两个参数,而 Function 只能接受一个参数。
thenAcceptBoth 方法核心参数BiConsumer 作用也与 Consumer一样,不过其需要接受两个参数。
runAfterBoth 方法核心参数最简单,上面已经介绍过,不再介绍。
这三组方法只能完成两个任务 AND 汇聚关系,如果需要完成多个任务汇聚关系,需要使用 CompletableFuture#allOf,不过这里需要注意,这个方法是不支持返回任务结果。
6.OR 汇聚关系
有 AND 汇聚关系,当然也存在 OR 汇聚关系。OR 汇聚关系代表只要多个任务中任一任务完成,就可以接着接着执行下一任务。
CompletionStage 有以下接口描述这种关系:
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
前面三组接口方法传参与 AND 汇聚关系一致,这里也不再详细解释了。
当然 OR 汇聚关系可以使用 CompletableFuture#anyOf 执行多个任务。
下面示例代码展示如何使用 applyToEither 完成 OR 关系。
CompletableFuture<String> cf
= CompletableFuture.supplyAsync(() -> {
sleep(5, TimeUnit.SECONDS);
return "hello,楼下小姐姐";
});// 1
CompletableFuture<String> cf2 = cf.supplyAsync(() -> {
sleep(3, TimeUnit.SECONDS);
return "hello,mavic";
});
// 执行 OR 关系
CompletableFuture<String> cf3 = cf2.applyToEither(cf, s -> s);
// 输出结果,由于 cf2 只休眠 3 秒,优先执行完毕
System.out.println(cf2.join());
// 结果:hello,mavic
7.异常处理
CompletableFuture 方法执行过程若产生异常,当调用 get,join获取任务结果才会抛出异常。
CompletionStage 提供几个方法,可以优雅处理异常。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
exceptionally 使用方式类似于 try..catch 中 catch代码块中异常处理。
whenComplete 与 handle 方法就类似于 try..catch..finanlly 中 finally 代码块。无论是否发生异常,都将会执行的。这两个方法区别在于 handle 支持返回结果。
下面示例代码展示 handle 用法:
CompletableFuture<Integer>
f0 = CompletableFuture.supplyAsync(() -> (7 / 0))
.thenApply(r -> r * 10)
.handle((integer, throwable) -> {
// 如果异常存在,打印异常,并且返回默认值
if (throwable != null) {
throwable.printStackTrace();
return 0;
} else {
// 如果
return integer;
}
});
System.out.println(f0.join());
/**
* java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
* .....
*
* 0
*/
总结
JDK8 提供 CompletableFuture 功能非常强大,可以编排异步任务,完成串行执行,并行执行,AND 汇聚关系,OR 汇聚关系。
不过这个类方法实在太多,且方法还需要传入各种函数式接口,新手刚开始使用会直接会被弄懵逼。这里再总结一下三类核心参数的作用:
- Function 这类函数接口既支持接收参数,也支持返回值
- Consumer 这类接口函数只支持接受参数,不支持返回值
- Runnable 这类接口不支持接受参数,也不支持返回值
搞清楚函数参数作用以后,然后根据串行,AND 汇聚关系,OR 汇聚关系归纳一下相关方法,这样就比较好理解了