文章目录
- 背景
- CompletableFuture定义与用法
- Completion
- thenApplyAsync方法
- run()方法
- postComplete()
背景
Java的异步并发Future接口表示异步计算的结果。CompletableFuture是对Future接口的增强,它实现CompletionStage接口,允许链式组合异步操作,组合多个异步任务的结果,处理异常情况,任务结束时执行回调方法。
CompletableFuture定义与用法
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {volatile Object result; // Either the result or boxed AltResultvolatile Completion stack;
}
CompletableFuture类有2个成员变量,volatile修饰符保证多线程环境下变量可见性。result表示当前任务的结果,可能是正常结果,可能是异常对象。stack是当前任务确定结果后接下来所有执行的任务栈。
CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
常用的创建方法中,supplyAsync有返回值,runAsync没有返回值(返回值类型为Void)。async表示异步,即runnable任务不由调用线程执行,而是由线程池执行。
默认线程池是ForkJoinPool,这是一个全局唯一的线程池。为了管理任务,建议对不同类型任务分别分配线程池。
Completion
abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;abstract CompletableFuture<?> tryFire(int mode);abstract boolean isLive();public final void run() { tryFire(ASYNC); }public final boolean exec() { tryFire(ASYNC); return false; }public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}
}
Completion是CompletableFuture的内部类。CompletableFuture强调存储任务结果,而Completion强调计算单个任务以及组合多个任务,强调动作,所以它继承Runnable,可以异步执行。例子:
ExecutorService executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
CompletableFuture base = new CompletableFuture();
base.thenApply(u -> {return "h";
});
base.thenAcceptAsync(s -> System.out.println(s), executor);
base.thenRunAsync(() -> System.out.println("c"), executor);
// 时间点1
base.complete("s");
程序运行到时间点1,base任务还没完成(等base.complete("s")结束base对象的result=s才算完成),它将三个thenXxx()方法装在栈里,以便将来调用,关系如图。每个thenXxx()方法都对应一个Completition对象,后调用的方法在栈顶,先调用的在栈底。栈的底层是链表。stack表示当前任务完成后将要执行的任务,即栈顶元素。
注意:这里的栈是Treiber stacks,用CAS原子性更改栈顶元素,实现线程安全地更改栈。

源码中thenApply()方法对应的不是Completion类本身,而是它的子类UniApply类。Completion的子类很多,如下图,它们的区别是
- 输入不同。比如
Uni开头的子类表示1个输入,Bi开头的子类表示2个输入。比如UniApply接收Function类型任务,UniRun接受Runnable类型任务。 - 功能不同。比如
BiRun表示两个输入都已运算结束才行,OrRun表示其中1个结束就行。比如Signaller类用于当前线程挂起后(比如get()方法)将来唤醒本线程(就是执行LockSupport.unpark(本线程对象))。

图片来自https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html。
thenApplyAsync方法
以CompletableFuture#thenApplyAsync()为例,分析执行流程。如果前置任务已经有结果,那么不加入stack栈,直接运算,如果前置任务还没算完,那么用unipush()方法加入栈。
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {// screenExecutor()方法的作用是校验线程池参数return uniApplyStage(screenExecutor(executor), fn);
}
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();Object r;if ((r = result) != null) // `result`是前置任务的结果,即本次调用`thenApplyAsync()`方法的`CompletableFuture`对象的`result`变量,即上面代码中base对象的result变量。return uniApplyNow(r, e, f); // `uniApplyNow()`方法的含义是:本线程立即执行,不加入`stack`栈CompletableFuture<V> d = newIncompleteFuture(); // 源码中有非常多`d`,都表示本次计算的结果unipush(new UniApply<T,V>(e, d, this, f));return d;
}
private <V> CompletableFuture<V> uniApplyNow(Object r, Executor e, Function<? super T,? extends V> f) {Throwable x;CompletableFuture<V> d = newIncompleteFuture(); // d表示本次计算任务的返回结果if (r instanceof AltResult) { // 如果前置任务抛出异常if ((x = ((AltResult)r).ex) != null) {d.result = encodeThrowable(x, r); // 当前任务也不执行`Function`方法,直接抛出同样异常return d;}r = null;}try {if (e != null) {e.execute(new UniApply<T,V>(null, d, this, f)); // 交给线程池执行`UniApply`的`run()`方法。第一个参数null是线程池对象,此时已经不需要了。第二个参数`d`是本次任务的结果对象。第三个参数是前置任务的结果对象,即本次调用`thenApplyAsync()`方法的`CompletableFuture`对象。第四个参数是本次任务的方法体。} else {@SuppressWarnings("unchecked") T t = (T) r;d.result = d.encodeValue(f.apply(t)); // 本线程直接执行。t是上次任务的正常结果,d.result是本次任务的正常}} catch (Throwable ex) {d.result = encodeThrowable(ex); // 本次任务如果异常,那么本次任务结果是异常对象}return d;
}
static final class AltResult { // See above // AltResult 是异常包装类final Throwable ex; // null only for NILAltResult(Throwable x) { this.ex = x; }
}
// `UniApply`对象的构造器方法。`src`是前置任务的结果,`dep`(方法中写作`d`)是本次任务的结果,`fn`是本次任务的方法体。
UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;
}
unipush()方法,不断自旋将任务压入栈。
final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) {// 运行至此,入栈失败if (result != null) { // result != null表示前置任务出结果了NEXT.set(c, null); // 取消`tryPushStack()`方法的`NEXT.set(c, h)`break;}// 运行至此,入栈失败,且前置任务没出结果,则不断自旋}// 运行至此,要么入栈成功,要么前置任务出结果了if (result != null)// 运行至此,前置任务出结果,那么不入栈,直接执行本次方法体,这里执行的不是`run()`方法,而是`trynFire(SYNC)`方法。c.tryFire(SYNC); }
}
final boolean tryPushStack(Completion c) {Completion h = stack; // stack是前置任务的栈顶元素,即this.stackNEXT.set(c, h); // CAS piggyback // 这句语义等于`c.next = h`,而`NEXT`是`VarHandle`对象,能线程安全并且高效地更改对象的值return STACK.compareAndSet(this, h, c);// 如果返回true,表示当前`CompletableFuture`对象的`stack`由h,线程安全地改成了c.// stack = h, 本次CAS自旋改成了 stack = c, c.next = h。
}
run()方法
abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {abstract CompletableFuture<?> tryFire(int mode);public final void run() { tryFire(ASYNC); }
}
class CompletableFuture {static final int SYNC = 0;static final int ASYNC = 1;static final int NESTED = -1;
}
Completition类定义run()方法,内部执行的是子类覆写的tryFire(int mode)方法。try表示尝试执行,可能执行失败。int型mode变量其实是个枚举量,只有3个值。
SYNC,同步,如果没有得到结果,本方法调用不返回。ASYNC,异步,就算没有得到结果,本方法调用也返回。NESTED,内嵌,postComplete()方法独有的。
mode不同,tryFire(int mode)方法执行流程也不同。以UniApply类的tryFire(int mode)为例。
static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn; // 成员变量`fn`,本次任务的方法体// 构造器方法UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a; Object r; Throwable x; Function<? super T,? extends V> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)// 运行至此,(a = src) == null为true,(d = dep) == null为true,(f = fn) == null)为true都表示已执行完`src = null; dep = null; fn = null;`语句,当前`UniApply`已被清空,任务已被完成// (r = a.result) == null为true表示前置任务还没完成,还不能执行当前操作return null; tryComplete: if (d.result == null) { // 当前任务结果为空if (r instanceof AltResult) { // 前置任务结果异常,则当前任务也异常if ((x = ((AltResult)r).ex) != null) {d.completeThrowable(x, r);break tryComplete;// 跳出嵌套,下一行执行`src = null; dep = null; fn = null;`}r = null; // 及时清空变量,帮助GC}try {// mode <= 0 为true表示状态不是ASYNC,即非异步执行// !claim()为true表示该任务已经被执行过了// 因此返回if (mode <= 0 && !claim())return null;else {@SuppressWarnings("unchecked") T t = (T) r;// 本线程直接方法体,t是前置任务的结果d.completeValue(f.apply(t));}} catch (Throwable ex) {d.completeThrowable(ex);}}src = null; dep = null; fn = null;// 执行至此,本次任务结束,将成员变量`src, dep, fn`都设为null,表示当前任务已完成。// 比如别的线程调用此对象的`isLive()`方法(),就返回dep对象是否为空。// final boolean isLive() { // return dep != null; // }return d.postFire(a, mode);}
}
// UniCompletion<T,V>#claim(),其中UniCompletion是UniApply的父类
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// CAS原子性修改标志位,表示只有1个线程能够执行本`Completion`类// 运行至此,表示本`Completion`类第一次被执行if (e == null)return true; // 本线程直接执行,返回trueexecutor = null; // disable 成员变量executore.execute(this); // 交给线程池执行,跳出if语句,返回的是false}return false;
}
// CompletableFuture#postFire()
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {if (a != null && a.stack != null) {Object r;if ((r = a.result) == null) // if语句为true表示当前对象未完成a.cleanStack(); // 清理a对象`stack`栈中空对象,即已经完成的对象if (mode >= 0 && (r != null || a.result != null)) // if语句为true表示当前a任务已完成,状态为`SYNC, ASNYC`而不是`NESTED`// a任务已经完成,现在要处理a对象的下游(依赖)对象a.postComplete();}if (result != null && stack != null) {// mode < 0 为true表示状态是`NESTED`,返回当前任务对象,`postComplete()`方法会用到if (mode < 0)return this;elsepostComplete(); // 当前任务已经完成,现在要处理当前任务的下游(依赖)对象}return null;
}
postComplete()
进入postComplete()方法的前提是当前任务已经有结果了,即this.result != null。
final void postComplete() {/** On each step, variable f holds current dependents to pop* and run. It is extended along only one path at a time,* pushing others to avoid unbounded recursion.*/CompletableFuture<?> f = this; Completion h; // f是当前任务,h是栈顶元素while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}NEXT.compareAndSet(h, t, null); // try to detach}f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}
以图为例,执行base.postComplete()方法。
第一次进入while循环,h = thenComp1, f = base,执行f = (d = h.tryFire(NESTED)) == null ? this : d;后f=comp1Result(原因是tryFire(NESTED)方法不会链式执行,只执行1个任务就返回结果)。

第二次进入while循环,f=comp1Result, h = comp1Thencomp1,由于f != this,执行pushStack(h)后h离开comp1Result,进入base的stack。

continue表示会第三次进入while循环,将comp1Thencomp2也加入到base的stack。可以看到,comp1Result的栈元素是倒序之后进入base的栈。

第四次循环h != null为false,h重新赋值为当前base的stack。开始执行comp1Thencomp2任务。
