引言
在后端开发中,我们经常会使用线程池来异步执行任务,但是如果要进行异步编排任务可犯了难。线程池可不能按照一定顺序来执行相应任务,所以今天我们来讲讲Java8引入的,位于java.util.concurrent
包下的CompletableFuture。
如图是依靠CompletableFuture来实现的异步并行任务,聚合结果的示意图:

Future😯
要想了解CompletableFuture,我们就得先来了解一下Future。Future
是 Java 中用于表示异步计算结果的接口,它允许你启动一个长时间运行的任务,并在之后获取该任务的结果,而无需阻塞主线程。以下是该接口的主要方法。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutExceptio
}
|
需要注意的是,虽然 Future
提供了一种处理异步操作结果的方式,但它缺乏一些更高级的功能,比如组合多个异步操作的结果、异常处理等。
CompletableFuture😎
因Future的局限性,Java8引入CompletableFuture来解决Future的不足,而且还带来了新特性,让我们一起来看看。
创建CompletableFuture😪
1.new(即手动创建CompletableFuture):
1
| CompletableFuture<String> future = new CompletableFuture<>();
|
- 可以在后续通过
.complete()
或 .completeExceptionally()
手动设置结果或异常。
- 适合:需要跨线程通信、事件驱动等自定义控制流程的场景
2.工厂方法 supplyAsync()
1 2
| public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
|
两者的区别在于是否传入自定义线程池,前者使用默认线程池ForkJoinPool.commonPool(),后者可传入自定义的线程池。这个方法创建的CompletableFuture可执行一个带返回值的异步任务。
3.工厂方法 runAsync()
1 2
| public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
|
这两个方法的区别和supplyAsync()的两者区别一样。这个方法创建的CompletableFuture可执行一个没有返回值的异步任务。
4. completedFuture()
创建已完成状态的 Future
1
| public static <U> CompletableFuture<U> completedFuture(U value)
|
直接创建一个已经完成并带有结果的 CompletableFuture
,可以用于测试、快速返回已知结果、跳过某些分支逻辑
其实这个方法底层用的是new,即手动创建CompletableFuture,封装起来罢了😗
组合编排CompletableFuture🫨
上面说了CompletableFuture可以进行异步组合编排任务,那么到底是怎么样进行组合编排的呢?
链式调用.thenCompose()
这个方法主要用于异步任务的串行编排。简单来说,thenCompose()
用于将一个异步任务的结果作为输入,继续发起一个新的异步任务,形成一条异步流水线。
在下面这个例子中,通过工厂方法创建了一个CompletableFuture之后,接着将结果输入给新CompletableFuture作为参数,然后输出结果。
- 第一个任务返回
"Hello"
;
- 第二个任务接收
"Hello"
,并异步返回 "Hello World"
;
thenCompose()
把两个 Future 合并成一个连续的 Future,而不是嵌套的 CompletableFuture<CompletableFuture<String>>
。1 2 3 4
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
System.out.println(future.join());
|
太简单了?来点难度的,模拟多级异步调用。
假设你有如下三个服务接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| CompletableFuture<String> login(String username) { return CompletableFuture.supplyAsync(() -> "token-" + username); }
CompletableFuture<User> getUserInfo(String token) { return CompletableFuture.supplyAsync(() -> new User(token)); }
CompletableFuture<List<Order>> getOrders(User user) { return CompletableFuture.supplyAsync(() -> Arrays.asList(new Order("1001"), new Order("1002"))); }
|
使用 thenCompose()
编排整个流程:
1 2 3 4 5 6
| CompletableFuture<List<Order>> ordersFuture = login("alice") .thenCompose(token -> getUserInfo(token)) .thenCompose(user -> getOrders(user));
List<Order> orders = ordersFuture.join(); orders.forEach(order -> System.out.println("订单ID:" + order.id));
|
这样我们就通过thenCompose()实现了多个异步服务的链式调用,下面是一个链式调用的简单示意图

合并处理.thenCombine()
上面讲解了如何链式调用任务,属于是微信接龙形态的(hh,开个玩笑),现在我们来讲并行执行两个异步任务,并将它们的结果合并处理。
简单来说,**thenCombine()
用于将两个独立的 CompletableFuture
的结果进行合并处理,返回一个新的结果**,就能实现这样的效果。
适合用于:
- 同时发起多个异步请求(如查询用户信息、查询订单信息)
- 然后在两者都完成后,把结果合并输出
废话不多说,例子走起:
1 2 3 4 5 6 7 8
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> { return result1 + " " + result2; });
System.out.println(combinedFuture.join());
|
future1
和 future2
是并行执行的。
- 当两者都完成时,使用
thenCombine()
将它们的结果拼接起来。
不够接近真实业务?上点难度:
假设你有两个服务接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| CompletableFuture<String> fetchUserName(int userId) { return CompletableFuture.supplyAsync(() -> { Thread.sleep(500); return "Alice"; }); }
CompletableFuture<Integer> fetchOrderCount(String userName) { return CompletableFuture.supplyAsync(() -> { Thread.sleep(800); return 5; }); }
|
我们可以这样使用 thenCombine()
:
1 2 3 4 5 6 7 8 9 10 11
| int userId = 123;
CompletableFuture<String> userFuture = fetchUserName(userId); CompletableFuture<Integer> orderFuture = fetchOrderCount("Alice");
CompletableFuture<String> resultFuture = userFuture.thenCombine(orderFuture, (name, count) -> { return "用户:" + name + ",订单数:" + count; });
System.out.println(resultFuture.join());
|
thenCombine()
不会等待两个 Future 的异常状态,如果其中一个失败,整个 Future 也会失败。
- 如果你想处理异常或提供默认值,可以结合
.exceptionally()
或 .handle()
使用。
进阶:链式调用多个 thenCombine()
1 2 3 4 5 6 7 8
| CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 10); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 20); CompletableFuture<Integer> f3 = CompletableFuture.supplyAsync(() -> 30);
CompletableFuture<Integer> result = f1.thenCombine(f2, Integer::sum) .thenCombine(f3, Integer::sum);
System.out.println(result.join());
|
下面是一个简单的thenCombine()的示意图:

等待全部allOf()
上面两个方法所要求的异步任务之间都是有联系的,但是实际上我们业务也有很多没关联的异步任务,比如文件上传,下载等,需要同时开启多个任务来执行,等待全部完成之后再统一返回结果或者成功标识。
allOf()
就可以完成这样的效果,它可以用于并行执行多个异步任务、并在所有任务完成后再继续后续操作。但是它不会不会自动聚合各个 Future 的结果(不关心是否返回结果,完成就行),你需要手动获取每个 Future 的值。
基本用法:
1 2 3 4 5 6 7 8 9 10 11 12
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result1"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result2"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Result3");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.thenRun(() -> { System.out.println("所有任务已完成!"); System.out.println("future1: " + future1.join()); System.out.println("future2: " + future2.join()); System.out.println("future3: " + future3.join()); });
|
如果任意一个 Future 抛出异常,整个 allOf
也会失败,可以使用.exceptionally()
或 .handle()
来捕获和处理异常。
进阶:结合 thenApply()
聚合结果
如果你想在所有任务完成后合并结果,可以这样做:
1 2 3 4 5 6 7 8 9
| CompletableFuture<List<String>> combinedFuture = CompletableFuture.allOf(future1, future2, future3) .thenApply(v -> List.of( future1.join(), future2.join(), future3.join() ));
List<String> results = combinedFuture.join(); System.out.println(results);
|
以下是allOf()的一个简单示意图:

就等一个anyOf()
anyOf()
用于并行执行多个异步任务、并在第一个任务完成(无论成功或失败)后立即返回结果的场景
适合用于:
- 多个服务提供者并发调用,取最快响应
- 实现“超时降级”逻辑
- 异常容忍:只要有一个 Future 成功即可继续流程
基本用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from future1"; });
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from future2"; });
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from future3"; });
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);
System.out.println("最快完成的是:" + resultFuture.join());
|
进阶:结合 filter()
和 handle()
做降级处理
你可以使用 .handle()
来统一处理成功/失败情况:
1 2 3 4 5 6 7 8 9 10
| CompletableFuture<String> fastFuture = CompletableFuture.anyOf(future1, future2, future3) .handle((result, ex) -> { if (ex != null) { System.err.println("发生异常:" + ex.getCause()); return "默认值"; } return result.toString(); });
System.out.println(fastFuture.join());
|
下面是anyOf()的简单示意图

处理CompletableFuture异步结果😵
上面讲了这么多异步编排的方法,那么可不可以对单个异步任务进行处理的方法呢?有的,兄弟,有的,这样常用的方法有四个🤣
thenApply()
thenApply()
是 CompletableFuture
提供的一个方法,用于在异步任务完成之后,对结果进行转换处理。它返回一个新的 CompletableFuture
,表示经过转换后的新结果。
特点:
- 串行执行:前一个任务完成后,才执行
thenApply()
中的任务。
- 有返回值:
thenApply()
会返回一个新的结果(即转换后的结果)。
- 类似于函数式编程中的
map
操作。
基本用法:
1 2 3 4 5
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
future.thenApply(result -> result + " World") .thenAccept(finalResult -> System.out.println(finalResult));
|
thenAccept()
和thenRun()
现在着重讲一下这两个方法的区别。
方法名 |
作用 |
thenAccept() |
消费结果,接收上一个任务的结果,但不返回新的值(无返回) |
thenRun() |
执行动作,不接收上一个任务的结果,只在任务完成后执行一段逻辑 |
它们都是用于在 CompletableFuture
完成后做一些后续处理,常用于链式调用中的“副作用”操作。
与 thenApply()
的区别?
方法名 |
是否接收前一个 Future 的结果 |
是否有返回值 |
是否支持链式调用 |
是否异步 |
thenApply() |
✅ 是 |
✅ 是(新值) |
✅ 支持 |
✅ 异步 |
thenAccept() |
✅ 是 |
❌ 否(void) |
❌ 不传递值 |
✅ 异步 |
thenRun() |
❌ 否 |
❌ 否(void) |
❌ 不传递值 |
✅ 异步 |
thenAccept()
的基本用法:
1 2 3 4 5
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
future.thenAccept(result -> { System.out.println("接收到结果: " + result); });
|
- 接收上一个 Future 的结果作为输入
- 执行一个消费型操作(如打印、写日志、入库等)
- 没有返回值
thenRun()
的基本用法:
1 2 3 4 5 6 7
| CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("正在执行主任务..."); });
future.thenRun(() -> { System.out.println("主任务完成,执行后续清理操作"); });
|
- 不接收上一个 Future 的结果
- 只在 Future 完成后执行一个 Runnable 任务
- 适合用于执行一些不需要依赖结果的后续操作,比如发送通知、清理资源等
注意:这两个方法都不会影响链式调用的返回值!一个负责消费结果,另一个负责后续动作。
whenComplete()
whenComplete()
是一个回调方法,用于在 Future 完成(无论是正常完成还是异常完成)后执行一段逻辑。
常用于:
基本用法:正常完成
1 2 3 4 5 6 7 8 9 10
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello World") .whenComplete((result, throwable) -> { if (throwable == null) { System.out.println("任务完成,结果为: " + result); } else { System.err.println("任务出错: " + throwable.getMessage()); } });
System.out.println(future.join());
|
基本用法:异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { if (true) throw new RuntimeException("出错了!"); return 42; }).whenComplete((result, throwable) -> { if (throwable != null) { System.err.println("捕获到异常:" + throwable.getMessage()); } else { System.out.println("任务成功完成,结果是:" + result); } });
future.thenAccept(System.out::println);
|
CompletableFuture与线程池😈
CompletableFuture与线程池并不是相互矛盾的,而是相辅相成的。他们都有各自注重的领域,一个更注重异步任务编排,一个更加注重线程资源的管理。而且我们非常推荐使用自定义的线程池来使用CompletableFuture!
ps:如果都是用同一个默认线程池如 ForkJoinPool.commonPool()
,可能会发生资源争抢等问题!
1 2 3 4 5
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getName()); return "Hello"; }, customExecutor);
|
join() 和 get()😫
在使用 CompletableFuture
时,选择 join()
还是 get()
主要取决于你对异常处理的需求以及是否需要更细粒度的控制超时。两者的主要区别在于它们如何处理异常和超时:
join()
- 抛出未经检查的异常(Unchecked Exceptions) :如果 Future 完成时带有异常,
join()
会抛出一个未检查的 CompletionException
,这使得代码看起来更加简洁,尤其是在 Lambda 表达式中。
- 不支持超时参数:如果你不需要设置超时时间来等待结果,
join()
是一个不错的选择。
1 2 3 4 5
| try { String result = future.join(); } catch (CompletionException e) { }
|
get()
- 抛出经检查的异常(Checked Exceptions) :包括
InterruptedException
和 ExecutionException
。这意味着你需要在方法签名中声明这些异常或在调用处捕获它们。
- 支持超时参数:允许你指定等待结果的最大时间,这对于避免无限期等待某些操作完成特别有用。
1 2 3 4 5 6 7 8 9
| try { String result = future.get(1, TimeUnit.SECONDS); } catch (InterruptedException e) { } catch (ExecutionException e) { } catch (TimeoutException e) { }
|
使用建议
推荐使用 join()
的场景:
- 当你希望保持代码简洁,并且不想处理复杂的异常结构时。
- 在大多数情况下,尤其是当你已经在更高的层次上实现了异常处理逻辑时,
join()
提供了一种更直接的方式来获取结果。
推荐使用 get()
的场景:
- 当你需要对超时进行精确控制时。
- 如果你需要明确区分不同类型的异常(如中断、执行错误等),以便采取不同的恢复措施。
总的来说,在没有特殊需求的情况下(例如不需要超时控制),更推荐使用 join()
,因为它简化了异常处理流程,使代码更加清晰易读。不过,具体选择应基于你的实际需求和上下文环境。
总结❤️
如果你看了这篇文章有收获可以点赞+关注+收藏🤩,这是对笔者更新的最大鼓励!如果你有更多方案或者文章中有错漏之处,请在评论区提出帮助笔者勘误,祝你拿到更好的offer!