如何保证 CompletableFuture 在执行一段代码之前完全结束?

How to ensure that CompletableFuture is completely finished before execute a piece of code?

我有这段代码,我有两个问题。

  1. 我不确定为什么会看到 TimeoutException,因为任何地方都没有阻塞。
  2. 我想用 Collector 实现的是我有一个 class 将进入收集一堆指标,然后 CompletableFuture 完全完成我会执行 Collector 来发布指标。 finally 是否保证它会最后执行,因为我认为 .get() 应该被阻止直到它完成?
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

public class FutureWithCollector
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        Collector collector = new Collector();
        finalize(() -> query(collector), collector);
    }

    private static void finalize(Supplier<CompletableFuture<String>> submission, Collector collector) throws ExecutionException, InterruptedException
    {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        CompletableFuture<String> s = CompletableFuture.supplyAsync(submission, executorService).thenCompose(Function.identity());
        try {
            String result = s.get(1000, TimeUnit.MILLISECONDS);
            System.out.println(result);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            System.out.println(collector.getI());
        }
    }

    private static CompletableFuture<String> query(Collector collector)
    {
        CompletableFuture<String> future = new CompletableFuture<>();
        return future.thenApply(r -> {
            collector.collectStuff();
            return "Hello";
        });
    }

}

class Collector
{
    private volatile int i;

    public void collectStuff()
    {
        i++;
    }

    public int getI()
    {
        return i;
    }
}

输出

java.util.concurrent.TimeoutException
    at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at FutureWithCollector.finalize(FutureWithCollector.java:23)
    at FutureWithCollector.main(FutureWithCollector.java:15)
0

I'm not sure why I see TimeoutException as there's no blocking anywhere.

正如 Louis Wasserman 指出的那样,它来自 s.get(1000, TimeUnit.MILLISECONDS),这是一个阻塞调用。 (当然,它最多只阻塞1秒。但它仍然阻塞。)

Is finally guaranteed that it will be executed last.

是的...但这不是突出的问题,因为...

as I think .get() is supposed to be blocked until it's finished?

您没有使用 get()。您正在使用 get 超时!

在与未来相对应的任务完成之前,不能保证对 get 的超时调用会阻塞。它实际上一直等到任务完成或 1 秒超时到期。 (先到先得。)

如果您想 确定 任务已经完成,请不要在 CompletableFuture 上超时调用 get


你不能两全其美1。您要么等到任务完成(这可能需要无限长的时间),要么等待超时(任务可能尚未完成)。

1 - ...除非你有一个完全运行的时间机器,它可以让你进入未来,找出任务的结果,然后回到现在传递那个价值。

当你写 s.get(1000, TimeUnit.MILLISECONDS) 时,编译器警告你处理 TimeoutException。这也许就是您添加基本 catch 块的原因。

catch (TimeoutException e) {
   e.printStackTrace();
}

在这种情况下,查看 javadoc 为该方法说明的内容总是有帮助的,

/**
 * Waits if necessary for at most the given time for the computation
 * to complete, and then retrieves its result, if available.
 *
 * @param timeout the maximum time to wait
 * @param unit the time unit of the timeout argument
 * @return the computed result
 * @throws CancellationException if the computation was cancelled
 * @throws ExecutionException if the computation threw an
 * exception
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 * @throws TimeoutException if the wait timed out
 */
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

 

现在您还有 2 个异常需要处理,您的 IDE 或编译器是否也抱怨您要处理它们?

此外,finally 将在 get() 之后执行 returns 来自 Future 的期望值或抛出异常(直到它从 get() 中出来,因为它是一个阻塞打电话)。

这很简单,只需更改其中一种方法即可:

 private static CompletableFuture<String> query(Collector collector) {

    return CompletableFuture.supplyAsync(() -> {
        collector.collectStuff(); 
        return "hello";
    });

}

您正在做:

CompletableFuture<String> future = new CompletableFuture<>();

记录为:

Creates a new incomplete CompletableFuture.

基本上,没有人完成这个CompletableFuture,所以你总是会超时,不管它有多大。


您也可以稍微更改一下代码。如果您想 run 某事,请明确说明:

private static CompletableFuture<Void> query(Collector collector) {
     return CompletableFuture.runAsync(collector::collectStuff);
}

那么请注意collectStuff递增了一个volatile,但是这些递增不是原子的。

并且您始终可以使用 join 而不是 get 并且不处理已检查的异常(假设没有 join 需要超时)。