Future
Future
类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。具体来说是这样的:当我们执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,我们再通过 Future
类获取到耗时任务的执行结果。这样一来,程序的执行效率就明显提高了。
这其实就是多线程中经典的 Future 模式,你可以将其看作是一种设计模式,核心思想是异步调用,主要用在多线程领域,并非 Java 语言独有。
在 Java 中,Future
类只是一个泛型接口,位于 java.util.concurrent
包下,其中定义了 5 个方法,主要包括下面这 4 个功能:
- 取消任务;
- 判断任务是否被取消;
- 判断任务是否已经执行完成;
- 获取任务执行结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutExceptio }
|
简单理解就是:我有一个任务,提交给了 Future
来处理。任务执行期间我自己可以去做任何想做的事情。并且,在这期间我还可以取消任务以及获取任务的执行状态。一段时间之后,我就可以 Future
那里直接取出任务执行结果
Callable 和 Future 有什么关系?
我们可以通过 FutureTask
来理解 Callable
和 Future
之间的关系。
FutureTask
提供了 Future
接口的基本实现,**常用来封装 Callable
和 Runnable
**,具有取消任务、查看任务是否执行完成以及获取任务执行结果的方法。ExecutorService.submit()
方法返回的其实就是 Future
的实现类 FutureTask
。
1 2
| <T> Future<T> submit(Callable<T> task); Future<?> submit(Runnable task);
|
FutureTask
不光实现了 Future
接口,还实现了Runnable
接口,因此可以作为任务直接被线程执行。
FutureTask
有两个构造函数,可传入 Callable
或者 Runnable
对象。实际上,传入 Runnable
对象也会在方法内部转换为Callable
对象。
1 2 3 4 5 6 7 8 9 10 11
| public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
|
FutureTask
相当于对Callable
进行了封装,管理着任务执行的情况,存储了 Callable
的 call
方法的任务执行结果。通过其实现了Runnale接口,所以可以作为参数传给线程执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class Main { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3);
Callable<Integer> callable1 = new MyCallable(1); Callable<Integer> callable2 = new MyCallable(2); Callable<Integer> callable3 = new MyCallable(3);
FutureTask<Integer> futureTask1 = new FutureTask<>(callable1); FutureTask<Integer> futureTask2 = new FutureTask<>(callable2); FutureTask<Integer> futureTask3 = new FutureTask<>(callable3);
executor.submit(futureTask1); executor.submit(futureTask2); executor.submit(futureTask3);
try { System.out.println(futureTask1.isDone()); System.out.println(futureTask2.isDone()); System.out.println(futureTask3.isDone()); System.out.println("Result of task 1: " + futureTask1.get()); System.out.println("Result of task 2: " + futureTask2.get()); System.out.println("Result of task 3: " + futureTask3.get()); System.out.println(futureTask1.isDone()); System.out.println(futureTask2.isDone()); System.out.println(futureTask3.isDone()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
executor.shutdown(); }
public static class MyCallable implements Callable<Integer> { private final int num; public MyCallable(int num) { this.num = num; } @Override public Integer call() throws Exception { System.out.println("Starting task for number: " + num); Thread.sleep(2000); System.out.println("Task completed for number: " + num); return num * num; } } }
|
CompletableFuture
Future
在实际使用过程中存在一些局限性比如不支持异步任务的编排组合、获取计算结果的 get()
方法为阻塞调用。
Java 8 才被引入CompletableFuture
类可以解决Future
的这些缺陷。CompletableFuture
除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。
下面我们来简单看看 CompletableFuture
类的定义。
1 2
| public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { }
|
可以看到,CompletableFuture
同时实现了 Future
和 CompletionStage
接口。
CompletionStage
接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。
CompletableFuture
除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程的能力。
Future接口的方法
Future
接口有 5 个方法:
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消执行任务。
boolean isCancelled()
:判断任务是否被取消。
boolean isDone()
:判断任务是否已经被执行完成。
get()
:等待任务执行完成并获取运算结果。
get(long timeout, TimeUnit unit)
:多了一个超时时间。
CompletionStage
接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。
CompletionStage
接口中的方法比较多,CompletableFuture
的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程
创建 CompletableFuture的方法
常见的创建 CompletableFuture
对象的方法如下:
- 通过 new 关键字。
- 基于
CompletableFuture
自带的静态工厂方法:runAsync()
、supplyAsync()
。
new 关键字
通过 new 关键字创建 CompletableFuture
对象这种使用方式可以看作是将 CompletableFuture
当做 Future
来使用。
我们通过创建了一个结果值类型为 RpcResponse<Object>
的 CompletableFuture
,你可以把 resultFuture
看作是异步运算结果的载体。
1
| CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
|
假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete()
方法为其传入结果,这表示 resultFuture
已经被完成了。
1 2
| resultFuture.complete(rpcResponse);
|
你可以通过 isDone()
方法来检查是否已经完成。
1 2 3
| public boolean isDone() { return result != null; }
|
获取异步计算的结果也非常简单,直接调用 get()
方法即可。调用 get()
方法的线程会阻塞直到 CompletableFuture
完成运算。
1
| rpcResponse = completableFuture.get();
|
如果你已经知道计算的结果的话,可以使用静态方法 completedFuture()
来创建 CompletableFuture
。
1 2
| CompletableFuture<String> future = CompletableFuture.completedFuture("hello!"); assertEquals("hello!", future.get());
|
completedFuture()
方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。
1 2 3
| public static <U> CompletableFuture<U> completedFuture(U value) { return new CompletableFuture<U>((value == null) ? NIL : value); }
|
静态工厂方法runAsync和supplyAsync
这两个方法可以帮助我们封装计算逻辑。
1 2 3 4 5 6
| static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
|
runAsync()
方法接受的参数是 Runnable
,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 runAsync()
方法。
1 2 3 4
| @FunctionalInterface public interface Runnable { public abstract void run(); }
|
supplyAsync()
方法接受的参数是 Supplier<U>
,这也是一个函数式接口,U
是返回结果值的类型。
1 2 3 4 5 6 7 8 9 10
| @FunctionalInterface public interface Supplier<T> {
T get(); }
|
当你需要异步操作且关心返回结果的时候,可以使用 supplyAsync()
方法。
1 2 3 4 5 6
| CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("hello!")); future.get();
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello!"); assertEquals("hello!", future2.get());
|
静态工厂方法completableFuture
它用于创建一个已经完成的 CompletableFuture
实例。这个方法接收一个参数,该参数作为 CompletableFuture
的结果,并且返回一个 CompletableFuture
对象,这个对象的状态已经被标记为完成(completed),其结果就是方法接收的参数。
1 2 3 4
| CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
String result = future.get();
|
处理异步结算的结果
当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
thenApply()
thenAccept()
thenRun()
whenComplete()
thenApply
thenApply()
方法接受一个 Function
实例,用于处理异步计算的结果,并转换为另一种结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); }
public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(defaultExecutor(), fn); }
public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn, Executor executor) { return uniApplyStage(screenExecutor(executor), fn); }
|
thenApply()
方法使用示例如下:
1 2 3 4 5 6
| CompletableFuture<String> future = CompletableFuture.completedFuture("hello!") .thenApply(s -> s + "world!"); System.out.println(future.get());
future.thenApply(s -> s + "nice!"); System.out.println(future.get());
|
thenAccept
接收任务的处理结果(如果有),并返回一个 CompletableFuture<Void>
对象,表示该处理结果已被消费。thenAccept()
方法的参数是一个 Consumer
,用于处理任务的结果但不返回任何结果。
thenAccept()
方法的参数是 Consumer<? super T>
。
1 2 3 4 5 6 7 8 9 10 11 12
| public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); }
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(defaultExecutor(), action); }
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { return uniAcceptStage(screenExecutor(executor), action); }
|
thenAccept()
使用示例如下:
1 2 3 4
| CompletableFuture<Void> future = CompletableFuture.completedFuture("hello!") .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println); System.out.println(future.get());
|
thenRun
不接收任务的处理结果,仅用于在任务完成后执行一段代码逻辑,并返回一个 CompletableFuture<Void>
对象。thenRun()
方法的参数是一个 Runnable
,表示需要执行的代码块。
1 2 3 4 5 6 7 8 9 10 11 12
| public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); }
public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(defaultExecutor(), action); }
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) { return uniRunStage(screenExecutor(executor), action); }
|
thenRun()
使用示例如下:
1 2
| CompletableFuture.completedFuture("hello!") .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("你好!"));
|
whenComplete
在异步操作完成时执行某些操作,无论操作是正常完成还是因为异常而终止都会执行。这个方法提供了一种处理异步结果的方式,它可以用来进行清理工作、日志记录或者将异步操作的结果传递给其他处理流程。
whenComplete()
的方法的参数是 BiConsumer<? super T, ? super Throwable>
。
相对于 Consumer
, BiConsumer
可以接收 2 个输入对象然后进行“消费”,一个参数是 CompletableFuture
的正常结果(如果操作成功完成),第二个参数是捕获到的异常(如果操作因为异常而终止)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); }
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(defaultExecutor(), action); }
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor) { return uniWhenCompleteStage(screenExecutor(executor), action); }
|
whenComplete会直接执行,不管有没有异常
1 2 3 4 5 6 7 8 9
| CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Error occurred"); }).whenComplete((result, exception) -> { if (exception != null) { System.out.println("Error: " + exception.getMessage()); } else { System.out.println("Result: " + result); } });
|
如果没有异常,第二个参数输出null
1 2 3 4 5 6 7 8 9
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!") .whenComplete((result, exception) -> { System.out.println(result); System.out.println(exception); }); System.out.println(future.get());
|
组合2个异步任务 CompletableFuture
thenCompose
你可以使用 thenCompose()
按顺序链接两个 CompletableFuture
对象,实现异步的任务链。它的作用是将前一个任务的返回结果作为下一个任务的输入参数,从而形成一个依赖关系。在实际开发中,这个方法还是非常有用的。比如说,task1 和 task2 都是异步执行的,但 task1 必须执行完成后才能开始执行 task2(task2 依赖 task1 的执行结果)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); }
public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(defaultExecutor(), fn); }
public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { return uniComposeStage(screenExecutor(executor), fn); }
|
thenCompose()
方法会使用示例如下:
1 2 3 4
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world!")); assertEquals("hello!world!", future.get());
|
thenCombine
和 thenCompose()
方法类似的还有 thenCombine()
方法, 它同样可以组合两个 CompletableFuture
对象。它用于将两个独立的异步操作(CompletableFuture
实例)的结果合并,并应用一个函数来处理这两个结果,最终返回一个新的 CompletableFuture
实例。
1 2 3 4 5 6
| CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "hello!") .thenCombine(CompletableFuture.supplyAsync( () -> "world!"), (s1, s2) -> s1 + s2) .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "nice!")); assertEquals("hello!world!nice!", completableFuture.get());
|
那 thenCompose()
和 thenCombine()
有什么区别呢?
thenCompose()
可以链接两个 CompletableFuture
对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
thenCombine()
会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。
acceptEither
它的作用是在两个独立的 CompletableFuture
中,无论哪个异步操作先完成,都执行一个 Consumer
对象,该 Consumer
接受最先完成的异步操作的结果。
例如,如果我们想要实现 task1 和 task2 中的任意一个任务执行完后就执行 task3 的话,可以使用 acceptEither()
。
1 2 3 4 5 6 7 8 9
| public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(null, other, action); }
public CompletableFuture<Void> acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(asyncPool, other, action); }
|
第一个参数是另一个异步任务的 CompletionStage
对象,acceptEither
方法的第二个参数应该是一个 Consumer
,用于处理两个异步任务中任意一个任务完成后的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> { System.out.println("任务1开始执行,当前时间:" + System.currentTimeMillis()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务1执行完毕,当前时间:" + System.currentTimeMillis()); return "task1"; });
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2开始执行,当前时间:" + System.currentTimeMillis()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2执行完毕,当前时间:" + System.currentTimeMillis()); return "task2"; });
task.acceptEitherAsync(task2, (res) -> { System.out.println("任务3开始执行,当前时间:" + System.currentTimeMillis()); System.out.println("上一个任务的结果为:" + res); });
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
|
1 2 3 4 5 6
| 任务1开始执行,当前时间:1695088058520 任务2开始执行,当前时间:1695088058521 任务1执行完毕,当前时间:1695088059023 任务3开始执行,当前时间:1695088059023 上一个任务的结果为:task1 任务2执行完毕,当前时间:1695088059523
|
任务组合操作acceptEitherAsync()
会在异步任务 1 和异步任务 2 中的任意一个完成时触发执行任务 3,但是需要注意,这个触发时机是不确定的。如果任务 1 和任务 2 都还未完成,那么任务 3 就不能被执行。
并行多个 CompletableFuture
你可以通过 CompletableFuture
的 allOf()
这个静态方法来并行运行多个 CompletableFuture
。
实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立地运行。
比说我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。像这种情况我们就可以使用并行运行多个 CompletableFuture
来处理。
allOf
这个方法接受一个 CompletableFuture
对象的可变参数列表,并返回一个新的 CompletableFuture<Void>
实例,该实例在所有给定的 CompletableFuture
任务都完成时完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CompletableFuture<Void> task1 = CompletableFuture.supplyAsync(()->{ }); ...... CompletableFuture<Void> task6 = CompletableFuture.supplyAsync(()->{ }); ...... CompletableFuture<Void> headerFuture = CompletableFuture.allOf(task1,.....,task6); try { headerFuture.join(); } catch (Exception ex) { ...... } System.out.println("all done. ");
|
allOf()
方法会等到所有的 CompletableFuture
都运行完成之后再返回
调用 join()
可以让程序等future1
和 future2
都运行完了之后再继续执行。
join
方法解释
join
方法是 CompletableFuture
类中的一个阻塞方法,用于等待异步操作完成,并返回操作的结果。
- 如果
CompletableFuture
完成时包含一个结果值,join
方法会返回这个值。
- 如果
CompletableFuture
完成时包含一个异常,join
方法会抛出这个异常。
anyOf
anyOf()
方法不会等待所有的 CompletableFuture
都运行完成之后再返回,只要有一个执行完成即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Random rand = new Random(); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("future1 done..."); } return "abc"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 + rand.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("future2 done..."); } return "efg"; });
|
哪一个线程先返回,get就获取哪一个线程的结果
1 2
| CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2); System.out.println(f.get());
|
输出结果可能是:
也可能是: