从CompletableFuture到异步编程设计
从CompletableFuture到异步编程设计,笔者就分为2部分来分享CompletableFuture异步编程设计,前半部分总结下CompletableFuture使用实践,后半部分分享下CompletableFuture实现原理和异步编程设计机制。
(ps:本文内容较多,请耐心阅读。如果读者了解CompletableFuture使用的话,可以直接看后半部分内容;如果熟悉CompletableFuture及异步编程设计的话,可以直接翻到文档末尾点个“推荐”就好了,因为你已经掌握了Java异步设计精髓了 🙂 ,若有不正确地方,感谢评论区指正交流~ )
Java8新增了CompletableFuture类,该类实现了CompletionStage和Future接口,简化了Java异步编程能力,该类方法较多,其实套路只有一个,那就是任务执行完成之后执行“回调”。
CompletableFuture使用实践
Java8新增的CompletableFuture 提供对异步计算的支持,可以通过回调的方式处理计算结果。CompletableFuture 类实现了CompletionStage和Future接口,所以还可以像之前使用Future那样使用CompletableFuture ,尽管已不再推荐这样用了。
CompletableFuture的创建
// 创建一个带result的CompletableFuture CompletableFuture<String> future = CompletableFuture.completedFuture("result"); future.get(); // 默认创建的CompletableFuture是没有result的,这时调用future.get()会一直阻塞下去知道有result或者出现异常 future = new CompletableFuture<>(); try { future.get(1, TimeUnit.SECONDS); } catch (Exception e) { // no care } // 给future填充一个result future.complete("result"); assert "result".equals(future.get()); // 给future填充一个异常 future = new CompletableFuture<>(); future.completeExceptionally(new RuntimeException("exception")); try { future.get(); } catch (Exception e) { assert "exception".equals(e.getCause().getMessage()); }
上面的示例是自己设置future的result,一般情况下我们都是让其他线程或者线程池来执行future这些异步任务。除了直接创建CompletableFuture 对象外(不推荐这样使用),还可以使用如下4个方法创建CompletableFuture 对象:
// runAsync是Runnable任务,不带返回值的,如果入参有executor,则使用executor来执行异步任务 public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) // supplyAsync是待返回结果的异步任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) // 使用示例 CompletableFuture.runAsync(() -> { System.out.println("hello world"); }, executor); CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; });
如果入参不带executor,则默认使用ForkJoinPool.commonPool()作为执行异步任务的线程池;否则使用executor执行任务。
CompletableFuture的完成动作
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn) // 使用示例 CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).whenCompleteAsync((result, e) -> { System.out.println(result + " " + e); }).exceptionally((e) -> { System.out.println("exception " + e); return "exception"; });
action是Action类型,从上面可以看出它既可以处理正常返回值也可以处理异常,whenComplete会在任务执行完成后直接在当前线程内执行action动作,后缀带Async的方法是交给其他线程执行action(如果是线程池,执行action的可能和之前执行异步任务的是同一个线程),入参带executor的交给executor线程池来执行action动作,当发生异常时,会在当前线程内执行exceptionally方法。
除了用上面的whenComplete来执行完成动作之外,还可以使用handle方法,该方法可以返回一个新的CompletableFuture的返回类型。
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) // handle方法示例: CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }); CompletableFuture<Integer> f2 = f1.handle((r, e) -> { System.out.println("handle"); return 1; });
除了使用handle方法来执行CompletableFuture返回类型转换之外,还可以使用thenApply方法,二者不同的是前者会处理正常返回值和异常,因此可以屏蔽异常,避免继续抛出;而后者只能处理正常返回值,一旦有异常就会抛出。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) // thenApply方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenApply((r) -> { System.out.println(r); return "aaa"; }).thenApply((r) -> { System.out.println(r); return 1; });
注意,上面的handle、thenApply都是返回新的CompletableFuture类型,如果只是为了在CompletableFuture完成之后执行某些消费动作,而不返回新的CompletableFuture类型,则可以使用thenAccept方法。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) // thenAccept方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenAccept(r -> { System.out.println(r); }).thenAccept(r -> { // 这里的r为Void(null)了 System.out.println(r); });
上面的handle、thenApply和thenAppept都是对上一个CompletableFuture执行完的结果进行某些操作。那么可不可以同时对2个CompletableFuture执行结果执行某些操作呢?其实也是可以的,使用thenAppeptBoth方法即可。注意,thenAppeptBoth和handle/thenApply/thenAppep的流程是一样的,只不过thenAppeptBoth中包含了另一个CompletableFuture对象(注意,这里另一个CompletableFuture对象的执行可并不是上一个CompletableFuture执行结束才开始执行的)。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) // thenAcceptBoth方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); });
注意,thenAcceptBoth方法是没有返回值的(CompletableFuture<Void>),如果想用thenAcceptBoth这样的功能并且还带有返回值的CompletableFuture,那么thenCombine方法就该上场了。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) // thenCombine方法示例 CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); return r1 + "-" + r2; });
thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,而下面的方法是当任意一个CompletableFuture计算完成的时候就会执行。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
如果当想在多个CompletableFuture都计算完成或者多个CompletableFuture中的一个计算完成后执行某个动作,可使用方法 allOf 和 anyOf。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
如果当任务完成时并不想用CompletableFuture的结果,可以使用thenRun方法来执行一个Runnable。
public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
以上方法都是在方法中返回一个值(或者不返回值),其实还可以返回一个CompletableFuture,是不是很像类的组合一样。
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor) // thenCompose方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenCompose(r -> { System.out.println(r); return CompletableFuture.supplyAsync(() -> { System.out.println(r + " result2"); return r + " result2"; }); }); // 上面的代码和下面的代码效果是一样的 CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenApply(r -> { System.out.println(r); return r; }).thenApplyAsync(r -> { System.out.println(r + " result2"); return r + " result2"; });
CompletableFuture实现机制
先抛开 CompletableFuture 不谈,如果程序中使用了线程池,如何才能在某个任务执行完成之后执行某些动作呢?其实Java线程池本身已经提供了任务执行前后的hook方法(beforeExecute和afterExecute),如下:
public class ThreadPoolExecutor extends AbstractExecutorService { // ... protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } // ... }
我们只需要自定义线程池继承ThreadPoolExecutor ,然后重写beforeExecute和afterExecute方法即可,在afterExecute里可以执行一些动作。关于重写ThreadPoolExecutor 的一个示例可点击ListenableThreadPoolExecutor查看。
那么CompletableFuture 的实现机制是怎样的呢?其实,和上面的所说的“afterExecute机制”是类似的(本质是一样的,回调机制),也是在任务执行完成后执行某些动作,如下代码:
CompletableFuture.supplyAsync(() -> { // callable任务 System.out.println("hello world"); return "result"; }).thenApply(r -> { // 任务完成之后的动作(回调方法),类似于ThreadPoolExecutor.afterExecute方法 System.out.println(r); return r; });
上面的示例代码其实主要完成了3个步骤,这3个步骤其实也是CompletableFuture的实现流程:
- 执行任务
- 添加任务完成之后的动作(回调方法)
- 执行回调
下面笔者就以上面的示例代码,按照这3个步骤依次进行分析,此时建议读者打开idea,写个demo进行debug,这里篇幅有限,笔者就只讲解主要流程代码,其他代码自行阅读即可 🙂
1、执行任务
执行任务的主要逻辑就是 AsyncSupply.run 方法:
public void run() { CompletableFuture<T> d; Supplier<T> f; // dep是当前CompletableFuture,fn是任务执行逻辑 if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { // 1 任务执行 & result cas设置 d.completeValue(f.get()); } catch (Throwable ex) { // 1.1 result cas异常设置 d.completeThrowable(ex); } } // 2 任务完成,可能涉及到回调的执行 d.postComplete(); } }
2、添加回调
添加回调方法的流程是从 thenApply 开始的:
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { // 当上一个CompletableFuture未完成时,将该CompletableFuture添加 // 到上一个CompletableFuture的statck中 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
CompletableFuture.statck 是 UniCompletion 类型的,该类型如下:
UniCompletion<T,V> { volatile Completion next; // Treiber stack link Executor executor; // executor to use (null if none) CompletableFuture<V> dep; // the dependent to complete CompletableFuture<T> src; // source for action }
3、执行回调
执行回调是从CompletableFuture.postComplete 开始的:
final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. */ CompletableFuture<?> f = this; Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; // cas设置h.next到当前CompletableFuture.statck if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } } // UniAccept final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) // 执行回调 return null; dep = null; src = null; fn = null; // 返回当前CompletableFuture 或者 递归调用postComplete return d.postFire(a, mode); }
看完上面3个步骤,是不是还不太清楚多个CompletableFuture之间的执行流程呢,说实话笔者第一次看的时候也是这样的 :(,下面我们换个例子并给出图示来看:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello world f1"); sleep(1); // TimeUnit.SECONDS.sleep(1) return "result f1"; }); CompletableFuture<String> f2 = f1.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f3 = f2.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f4 = f1.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f5 = f4.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f6 = f5.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; });
上面代码对应的CompletableFuture及其Completion关系如下图:
结合上图和postComplete流程,可以看出执行回调的顺序是:f1 -> f4 -> f5 -> f6 -> f2 -> f3。(如果这里没看懂,可以回过头再看下postComplete方法的源码~)
异步编程设计
分析完了CompletableFuture,相信大家都已经对CompletableFuture的设计与实现有了进一步的理解。那么对于异步编程有哪些实际应用场景,其本质到底是什么呢?
异步处理的本质其实就是回调(系统层借助于指针来实现,准确来说是函数指针),用户提供一个回调方法,回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。从“宏观”来看,CompletableFuture的实现其实很简单,就是回调,即在任务执行完成之后进行回调,回调中可能涉及到其他操作,比如下一个回调或者执行下一个任务。
异步编程在应用场景较多,很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future
接口,提供了addListener
等多个扩展方法:
ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8080) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoHandler()); } });
dubbo中consumer对于RPC response的处理是基于回调机制的,Google guava也提供了通用的扩展Future:ListenableFuture、SettableFuture 以及辅助类Futures等,方便异步编程。
final String name = ...; inFlight.add(name); ListenableFuture<Result> future = service.query(name); future.addListener(new Runnable() { public void run() { processedCount.incrementAndGet(); inFlight.remove(name); lastProcessed.set(name); logger.info("Done with {0}", name); } }, executor);
参考资料:
2、https://www.cnblogs.com/aniao/p/aniao_cf.html
3、https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html