Future

Future

Future 类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。具体来说是这样的:当我们执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,我们再通过 Future 类获取到耗时任务的执行结果。这样一来,程序的执行效率就明显提高了。

这其实就是多线程中经典的 Future 模式,你可以将其看作是一种设计模式,核心思想是异步调用,主要用在多线程领域,并非 Java 语言独有。

在 Java 中,Future 类只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务;
  • 判断任务是否被取消;
  • 判断任务是否已经执行完成;
  • 获取任务执行结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
// 取消任务执行
// 成功取消返回 true,否则返回 false
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否被取消
boolean isCancelled();
// 判断任务是否已经执行完成
boolean isDone();
// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 指定时间内没有返回计算结果就抛出 TimeOutException 异常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutExceptio
}

简单理解就是:我有一个任务,提交给了 Future 来处理。任务执行期间我自己可以去做任何想做的事情。并且,在这期间我还可以取消任务以及获取任务的执行状态。一段时间之后,我就可以 Future 那里直接取出任务执行结果

Callable 和 Future 有什么关系?

我们可以通过 FutureTask 来理解 CallableFuture 之间的关系。

FutureTask 提供了 Future 接口的基本实现,**常用来封装 CallableRunnable**,具有取消任务、查看任务是否执行完成以及获取任务执行结果的方法。ExecutorService.submit() 方法返回的其实就是 Future 的实现类 FutureTask

1
2
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);

FutureTask 不光实现了 Future接口,还实现了Runnable 接口,因此可以作为任务直接被线程执行。

image-20240427151617099

FutureTask 有两个构造函数,可传入 Callable 或者 Runnable 对象。实际上,传入 Runnable 对象也会在方法内部转换为Callable 对象。

1
2
3
4
5
6
7
8
9
10
11
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
// 通过适配器RunnableAdapter来将Runnable对象runnable转换成Callable对象
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}

FutureTask相当于对Callable 进行了封装,管理着任务执行的情况,存储了 Callablecall 方法的任务执行结果。通过其实现了Runnale接口,所以可以作为参数传给线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);

Callable<Integer> callable1 = new MyCallable(1);
Callable<Integer> callable2 = new MyCallable(2);
Callable<Integer> callable3 = new MyCallable(3);

FutureTask<Integer> futureTask1 = new FutureTask<>(callable1);
FutureTask<Integer> futureTask2 = new FutureTask<>(callable2);
FutureTask<Integer> futureTask3 = new FutureTask<>(callable3);

executor.submit(futureTask1);
executor.submit(futureTask2);
executor.submit(futureTask3);

try {
System.out.println(futureTask1.isDone());//false
System.out.println(futureTask2.isDone());//false
System.out.println(futureTask3.isDone());//false
System.out.println("Result of task 1: " + futureTask1.get()); // 1
System.out.println("Result of task 2: " + futureTask2.get()); // 4
System.out.println("Result of task 3: " + futureTask3.get()); // 9
System.out.println(futureTask1.isDone());//true
System.out.println(futureTask2.isDone());//true
System.out.println(futureTask3.isDone());//true
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

executor.shutdown();
}

public static class MyCallable implements Callable<Integer> {
private final int num;
public MyCallable(int num) {
this.num = num;
}
@Override
public Integer call() throws Exception {
System.out.println("Starting task for number: " + num);
Thread.sleep(2000); // 模拟耗时操作
System.out.println("Task completed for number: " + num);
return num * num;
}
}
}

CompletableFuture

Future 在实际使用过程中存在一些局限性比如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用。

Java 8 才被引入CompletableFuture 类可以解决Future 的这些缺陷。CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

下面我们来简单看看 CompletableFuture 类的定义。

1
2
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}

可以看到,CompletableFuture 同时实现了 FutureCompletionStage 接口。

CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。

CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程的能力。

Future接口的方法

Future 接口有 5 个方法:

  • boolean cancel(boolean mayInterruptIfRunning):尝试取消执行任务。
  • boolean isCancelled():判断任务是否被取消。
  • boolean isDone():判断任务是否已经被执行完成。
  • get():等待任务执行完成并获取运算结果。
  • get(long timeout, TimeUnit unit):多了一个超时时间。

CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。

CompletionStage 接口中的方法比较多,CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程

创建 CompletableFuture的方法

常见的创建 CompletableFuture 对象的方法如下:

  1. 通过 new 关键字。
  2. 基于 CompletableFuture 自带的静态工厂方法:runAsync()supplyAsync()

new 关键字

通过 new 关键字创建 CompletableFuture 对象这种使用方式可以看作是将 CompletableFuture 当做 Future 来使用。

我们通过创建了一个结果值类型为 RpcResponse<Object>CompletableFuture,你可以把 resultFuture 看作是异步运算结果的载体。

1
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();

假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete() 方法为其传入结果,这表示 resultFuture 已经被完成了。

1
2
// complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);

你可以通过 isDone() 方法来检查是否已经完成。

1
2
3
public boolean isDone() {
return result != null;
}

获取异步计算的结果也非常简单,直接调用 get() 方法即可。调用 get() 方法的线程会阻塞直到 CompletableFuture 完成运算。

1
rpcResponse = completableFuture.get();

如果你已经知道计算的结果的话,可以使用静态方法 completedFuture() 来创建 CompletableFuture

1
2
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());

completedFuture() 方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。

1
2
3
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}

静态工厂方法runAsync和supplyAsync

这两个方法可以帮助我们封装计算逻辑。

1
2
3
4
5
6
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
// 使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

runAsync() 方法接受的参数是 Runnable ,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 runAsync() 方法。

1
2
3
4
@FunctionalInterface
public interface Runnable {
public abstract void run();
}

supplyAsync() 方法接受的参数是 Supplier<U> ,这也是一个函数式接口,U 是返回结果值的类型。

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Supplier<T> {

/**
* Gets a result.
*
* @return a result
*/
T get();
}

当你需要异步操作且关心返回结果的时候,可以使用 supplyAsync() 方法。

1
2
3
4
5
6
//调用get直接运行,没有返回值
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("hello!"));
future.get();// 输出 "hello!"
//调用get阻塞,直到返回结果
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello!");
assertEquals("hello!", future2.get());

静态工厂方法completableFuture

它用于创建一个已经完成的 CompletableFuture 实例。这个方法接收一个参数,该参数作为 CompletableFuture 的结果,并且返回一个 CompletableFuture 对象,这个对象的状态已经被标记为完成(completed),其结果就是方法接收的参数。

1
2
3
4
// 创建一个已经完成的CompletableFuture,其结果为"hello!"
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
// 可以直接获取结果,不会抛出异常或阻塞
String result = future.get(); // result 为 "hello!"

处理异步结算的结果

当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:

  • thenApply()
  • thenAccept()
  • thenRun()
  • whenComplete()

thenApply

thenApply() 方法接受一个 Function 实例,用于处理异步计算的结果,并转换为另一种结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}

//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}

thenApply() 方法使用示例如下:

1
2
3
4
5
6
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!");
System.out.println(future.get());//hello!world!
// 这次调用将被忽略
future.thenApply(s -> s + "nice!");
System.out.println(future.get()); //hello!world!

thenAccept

接收任务的处理结果(如果有),并返回一个 CompletableFuture<Void> 对象,表示该处理结果已被消费。thenAccept() 方法的参数是一个 Consumer用于处理任务的结果但不返回任何结果

thenAccept() 方法的参数是 Consumer<? super T>

1
2
3
4
5
6
7
8
9
10
11
12
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}

thenAccept() 使用示例如下:

1
2
3
4
CompletableFuture<Void> future = CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice!
//不返回任何结果
System.out.println(future.get()); //null

thenRun

不接收任务的处理结果,仅用于在任务完成后执行一段代码逻辑,并返回一个 CompletableFuture<Void> 对象。thenRun() 方法的参数是一个 Runnable,表示需要执行的代码块。

1
2
3
4
5
6
7
8
9
10
11
12
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}

thenRun() 使用示例如下:

1
2
CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("你好!"));//你好!

whenComplete

在异步操作完成时执行某些操作,无论操作是正常完成还是因为异常而终止都会执行。这个方法提供了一种处理异步结果的方式,它可以用来进行清理工作、日志记录或者将异步操作的结果传递给其他处理流程。

whenComplete() 的方法的参数是 BiConsumer<? super T, ? super Throwable>

相对于 ConsumerBiConsumer 可以接收 2 个输入对象然后进行“消费”,一个参数是 CompletableFuture 的正常结果(如果操作成功完成),第二个参数是捕获到的异常(如果操作因为异常而终止)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}


public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(defaultExecutor(), action);
}
// 使用自定义线程池(推荐)
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}

whenComplete会直接执行,不管有没有异常

1
2
3
4
5
6
7
8
9
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Error occurred");
}).whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Error: " + exception.getMessage());
} else {
System.out.println("Result: " + result);
}
});

如果没有异常,第二个参数输出null

1
2
3
4
5
6
7
8
9
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!")
.whenComplete((result, exception) -> {
// res 代表返回的结果
// ex 的类型为 Throwable ,代表抛出的异常
System.out.println(result); //hello
// 这里没有抛出异常所以 null
System.out.println(exception); //null
});
System.out.println(future.get()); //hello!

组合2个异步任务 CompletableFuture

thenCompose

你可以使用 thenCompose() 按顺序链接两个 CompletableFuture 对象,实现异步的任务链。它的作用是将前一个任务的返回结果作为下一个任务的输入参数,从而形成一个依赖关系。在实际开发中,这个方法还是非常有用的。比如说,task1 和 task2 都是异步执行的,但 task1 必须执行完成后才能开始执行 task2(task2 依赖 task1 的执行结果)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(defaultExecutor(), fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}

thenCompose() 方法会使用示例如下:

1
2
3
4
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "hello!")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world!"));
assertEquals("hello!world!", future.get());

thenCombine

thenCompose() 方法类似的还有 thenCombine() 方法, 它同样可以组合两个 CompletableFuture 对象。它用于将两个独立的异步操作(CompletableFuture 实例)的结果合并,并应用一个函数来处理这两个结果,最终返回一个新的 CompletableFuture 实例。

1
2
3
4
5
6
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "hello!")
.thenCombine(CompletableFuture.supplyAsync(
() -> "world!"), (s1, s2) -> s1 + s2)
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "nice!"));
assertEquals("hello!world!nice!", completableFuture.get());

thenCompose()thenCombine() 有什么区别呢?

  • thenCompose() 可以链接两个 CompletableFuture 对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
  • thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。

acceptEither

它的作用是在两个独立的 CompletableFuture 中,无论哪个异步操作先完成,都执行一个 Consumer 对象,该 Consumer 接受最先完成的异步操作的结果。

例如,如果我们想要实现 task1 和 task2 中的任意一个任务执行完后就执行 task3 的话,可以使用 acceptEither()

1
2
3
4
5
6
7
8
9
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}

第一个参数是另一个异步任务的 CompletionStage 对象,acceptEither 方法的第二个参数应该是一个 Consumer,用于处理两个异步任务中任意一个任务完成后的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行,当前时间:" + System.currentTimeMillis());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行完毕,当前时间:" + System.currentTimeMillis());
return "task1";
});

CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2开始执行,当前时间:" + System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2执行完毕,当前时间:" + System.currentTimeMillis());
return "task2";
});

task.acceptEitherAsync(task2, (res) -> {
System.out.println("任务3开始执行,当前时间:" + System.currentTimeMillis());
System.out.println("上一个任务的结果为:" + res);
});

// 增加一些延迟时间,确保异步任务有足够的时间完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

1
2
3
4
5
6
任务1开始执行,当前时间:1695088058520
任务2开始执行,当前时间:1695088058521
任务1执行完毕,当前时间:1695088059023
任务3开始执行,当前时间:1695088059023
上一个任务的结果为:task1
任务2执行完毕,当前时间:1695088059523

任务组合操作acceptEitherAsync()会在异步任务 1 和异步任务 2 中的任意一个完成时触发执行任务 3,但是需要注意,这个触发时机是不确定的。如果任务 1 和任务 2 都还未完成,那么任务 3 就不能被执行。

并行多个 CompletableFuture

你可以通过 CompletableFutureallOf()这个静态方法来并行运行多个 CompletableFuture

实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立地运行。

比说我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。像这种情况我们就可以使用并行运行多个 CompletableFuture 来处理。

allOf

这个方法接受一个 CompletableFuture 对象的可变参数列表,并返回一个新的 CompletableFuture<Void> 实例,该实例在所有给定的 CompletableFuture 任务都完成时完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> headerFuture = CompletableFuture.allOf(task1,.....,task6);
try {
headerFuture.join();
} catch (Exception ex) {
......
}
System.out.println("all done. ");

allOf() 方法会等到所有的 CompletableFuture 都运行完成之后再返回

调用 join() 可以让程序等future1future2 都运行完了之后再继续执行。

join方法解释

join 方法是 CompletableFuture 类中的一个阻塞方法,用于等待异步操作完成,并返回操作的结果。

  • 如果 CompletableFuture 完成时包含一个结果值,join 方法会返回这个值。
  • 如果 CompletableFuture 完成时包含一个异常,join 方法会抛出这个异常。

anyOf

anyOf() 方法不会等待所有的 CompletableFuture 都运行完成之后再返回,只要有一个执行完成即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Random rand = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("future1 done...");
}
return "abc";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("future2 done...");
}
return "efg";
});

哪一个线程先返回,get就获取哪一个线程的结果

1
2
CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2);
System.out.println(f.get());

输出结果可能是:

1
2
future2 done...
efg

也可能是:

1
2
future1 done...
abc

Future
http://example.com/2023/05/13/Java/Java多并发/Future/
作者
PALE13
发布于
2023年5月13日
许可协议