CompletableFuture 的完成处理程序在哪个线程中执行?

In which thread do CompletableFuture's completion handlers execute?

我对 CompletableFuture 方法有疑问:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

JavaDoc 就是这样说的:

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion.

线程呢?这将在哪个线程中执行?如果未来由线程池完成呢?

当谈到线程时,API 文档是缺乏的。需要一些推论来理解线程和 Futures 是如何工作的。从一个假设开始:CompletableFuture 的非 Async 方法不会自行生成新线程。工作将在现有线程下进行。

thenApply 将在原 CompletableFuture 的线程中 运行。那是调用 complete() 的线程,或者是调用 thenApply() 的线程(如果未来已经完成)。如果你想控制线程——如果 fn 是一个缓慢的操作,这是一个好主意——那么你应该使用 thenApplyAsync.

来自Javadoc

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

更具体地说:

  • fn 将在调用 complete() 期间在调用 complete().

    [ 的任何线程的上下文中 运行 =30=]
  • 如果在调用 thenApply()complete() 已经完成,则 fn 在线程调用 运行 的上下文中将是 运行 =14=].

CompletableFuture 文档中指定的政策可以帮助您更好地理解:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

Update:我还建议阅读@Mike 的,作为进一步深入细节的有趣分析文档。

正如 指出的那样,文档会告诉您需要了解的内容。但是,相关文本出奇地含糊,此处发布的一些评论(和答案)似乎依赖于文档不支持的假设。因此,我认为把它拆开是值得的。具体来说,我们应该非常仔细地阅读这段话:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

听起来很简单,但细节不多。它似乎有意避免描述 何时 可以在完成线程上调用依赖完成,而不是在调用 thenApply 等完成方法期间。正如所写,上面的段落实际上是 乞求 我们用假设来填补空白。这很危险,尤其是当主题涉及并发和异步编程时,我们作为程序员制定的许多期望都被颠覆了。让我们仔细看看文档没有说什么。

文档确实没有声称依赖完成调用complete()之前运行完成线程。此外,虽然它声明依赖完成 可能 在调用像 thenApply 这样的完成方法时被调用,但它 没有 声明完成将在注册它的线程上被调用(注意单词"any other")。

对于使用 CompletableFuture 安排和编写任务的任何人来说,这些都是潜在的重要要点。考虑以下事件序列:

  1. 线程 A 通过 f.thenApply(c1).
  2. 注册一个依赖完成
  3. 一段时间后,线程 B 调用 f.complete()
  4. 大约在同一时间,线程 C 通过 f.thenApply(c2).
  5. 注册了另一个依赖完成

从概念上讲,complete() 做了两件事:它发布未来的结果,然后它尝试调用依赖完成。现在,如果线程 C 运行s after 结果值被发布,但是 before 线程 B 开始调用 c1?根据实现,线程 C 可能会看到 f 已完成,然后它可能会调用 c1 c2。或者,线程 C 可以调用 c2 而让线程 B 调用 c1。该文档不排除任何一种可能性。考虑到这一点,以下是文档不支持的假设:

  1. f 完成 之前注册的相关完成 c 将在调用 f.complete() 期间调用;
  2. c 将在 f.complete() 时 运行 完成 returns;
  3. 依赖完成将以任何特定顺序(例如,注册顺序)调用;
  4. f 完成之前注册的相关完成将在 f 完成注册完成之前调用。

考虑另一个例子:

  1. 线程 A 调用 f.complete()
  2. 一段时间后,线程 B 通过 f.thenApply(c1);
  3. 注册完成
  4. 大约在同一时间,线程 C 通过 f.thenApply(c2) 注册了一个单独的完成。

如果已知 f 已经 运行 完成,那么人们可能会倾向于假设 c1 将在 f.thenApply(c1) 期间调用并且 c2 将在 f.thenApply(c2) 期间调用。人们可能会进一步假设 c1 将在 f.thenApply(c1) returns 时完成 运行。但是,文档 支持这些假设。调用 thenApply 的线程中的 one 可能会最终调用 both c1c2 ,而另一个线程既不调用也不调用。

仔细分析 JDK 代码可以确定上述假设场景的结果。但即使那样也是有风险的,因为您可能最终依赖于 (1) 不可移植,或 (2) 可能发生变化的实现细节。最好的办法是不要假设任何未在 javadoc 或原始 JSR 规范中详细说明的内容。

tldr: 小心你的假设,当你写文档时,尽可能清晰和深思熟虑。虽然简洁是一件好事,但要警惕人类填补空白的倾向。

我知道这个问题很老,但是我想用源代码来解释这个问题。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    Object r;
    if ((r = result) != null)
        return uniAcceptNow(r, e, f);
    CompletableFuture<Void> d = newIncompleteFuture();
    unipush(new UniAccept<T>(e, d, this, f));
    return d;
}

这是java16的源代码,我们可以看到,如果我们触发thenAccept,我们将向我们的函数传递一个空的执行器服务引用。 来自第二个函数 uniAcceptStage() 第二个 if 条件。如果result不为null,会触发uniAcceptNow()

if (e != null) {
     e.execute(new UniAccept<T>(null, d, this, f));
} else {
     @SuppressWarnings("unchecked") T t = (T) r;
     f.accept(t);
     d.result = NIL;
}

如果执行器服务为空,我们将使用lambda函数f.accept(t)来执行它。如果我们从主线程触发此 thenApply/thenAccept,它将使用主线程作为执行线程。

但是如果我们无法从上一个 completablefuture 中获取先前的结果,我们将使用 uniPush 函数将当前 UniAccept/Apply 压入堆栈。 UniAccept class 有 tryFire() ,它将被我们的 postComplete() 函数触发

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (STACK.compareAndSet(f, h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                NEXT.compareAndSet(h, t, null); // try to detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}