Java CompletableFuture

异步执行框架

Future虽然提供了新开线程运行异步任务的模式,但是无法很好的做好多个任务间的协调,要想知道一个异步任务何时完成,要么轮询isDone,要么调用get阻塞等待
为了提供异步任务间的流程控制CompletableFuture提供了触发机制,可以在异步任务执行完成后进行后续操作

接口及使用


CompletableFuture是以工具类的形式提供功能扩展,作为异步任务的运行器
为了能够后续串联逻辑,首先要对异步任务进行包装,作为源头,如果异步任务不产生结果,那么就runAsync,如果产生需要下游处理的结果,那么就supplyAsync
可以看到每个方法提供两种版本,传入指定线程池或者就用内置的ForkJoinPool,内置实现使用守护线程执行
如果无需运行,只需要包装一个值,使用completeFuture

CompletableFuture.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

static final AltResult NIL = new AltResult(null);
//...
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
}

CompletableFuture可以进行集合操作, 综合多个任务

CompletableFuture.java
1
2
3
4
5
6
7
8
//所有完成后返回完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
//任意一个执行成功就返回其结果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}

CompletableFuture协调异步任务的链式调用能力来自于CompletionStage接口,所有方法都返回自身接口,确保可以链式串联下去,自身任务完成后进行后续处理

CompletionStage.java
1
2
3
4
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

和另一个Stage都完成后进行后续处理

CompletionStage.java
1
2
3
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);

和另一Stage任一完成后进行后续处理

CompletionStage.java
1
2
3
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);

异常处理

CompletionStage.java
1
2
3
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

上述除了execptionally单纯的处理异常外,都提供三个版本方法。

  • 默认无后缀版同步执行, 如果上游任务来源线程池也可能直接在线程池中执行
  • 带Async后缀版使用内置线程池执行后续任务
  • 还可以自己传Executor自定义
Sample
1
2
3
4
5
6
7
8
System.out.println(Thread.currentThread()); //1
CompletableFuture.completedFuture("test").thenAccept(s -> {
System.out.println(Thread.currentThread()); //1
}).thenRunAsync(() -> {
System.out.println(Thread.currentThread()); //10
}).thenRun(() -> {
System.out.println(Thread.currentThread()); //1 or 10
}).get();

结果获取
get方法响应中断,抛出受检异常;join方法不响应中断,抛出非检查异常

CompletableFuture.java
1
2
3
4
5
6
7
8
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}

实现原理


CompletionStage维护两个变量,一个是当前阶段的计算结果,还有一个栈结构

CompletionStage.java
1
2
volatile Object result;
volatile Completion stack;

这两个变量的更新都通过CAS的方式更新,result为null表示未执行

CompletableFuture.java
1
2
3
4
5
6
7
8
9
10
11
12
//如果结果是null, 为了避免和未执行语义冲突,把result设为包装后的NIL类
final boolean completeNull() {
return UNSAFE.compareAndSwapObject(this, RESULT, null, NIL);
}
//正常情况下,result就是运行结果
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);
}
//如果遇到异常,把result设为异常
final boolean completeThrowable(Throwable x) {
return UNSAFE.compareAndSwapObject(this, RESULT, null, encodeThrowable(x));
}

从最简单的入手,发现一个普通的Runnable被包装成AsyncRun

CompletionStage.java
1
2
3
4
5
6
7
8
9
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}

可以发现AsyncRun构建了基本的运行框架,在Runnable任务执行后,写入完成后的结果
关键在postComplete,也就是说在自身任务完成后,从栈内取出后续任务执行

CompletionStage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class AsyncRun extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
CompletableFuture<Void> dep; Runnable fn;
//...
public void run() {
CompletableFuture<Void> d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
//还没有结果,说明没运行过
if (d.result == null) {
try {
//执行后写入结果
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//触发下游任务
d.postComplete();
}
}
}

了解触发后,尝试观察下游任务如何注册

CompletableFuture.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
//生成一个新的CompletableFuture
CompletableFuture<Void> d = new CompletableFuture<Void>();
//this当前对象表示上游,没有线程池时直接尝试当前线程执行uniAccept
if (e != null || !d.uniAccept(this, f, null)) {
//有线程池时,或者没有线程池但是上游还没完成,构建任务进栈
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) {
Object r; Throwable x;
//上游结果是null,说明上游没结束,直接返回false
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
//执行当前
@SuppressWarnings("unchecked") S s = (S) r;
f.accept(s);
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}

应用


检查异常要包裹为无需检查的CompletionException,避免在链路内部try-catch
如果timeOutPrint中直接输出内容,那么最终都会输出,无法实现二选一的效果
因为CompletableFuture只是决定后续执行时机,并不会取消先前的任务,先前任务继续运行只是不作为后续的条件

Sample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static String timeOutPrint(String content) {
try {
Random random = new Random();
TimeUnit.SECONDS.sleep(random.nextInt(5));
return content;
} catch (InterruptedException e) {
throw new CompletionException(e);
}
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture.supplyAsync(() -> {
return timeOutPrint("Chicken");
}).acceptEither(CompletableFuture.supplyAsync(() -> {
return timeOutPrint("Duck");
}), (s) -> {
System.out.println(s + " is a bird.");
}).get();
}