作者使用 thenCompose 而不是 thenComposeAsync 的理由是否正确

Is the writer's reason correct for using thenCompose and not thenComposeAsync

这个问题与这个问题不同因为我想知道作者使用thenCompose而不是thenComposeAsync的原因是什么。

我正在阅读 Modern Java in action,我在第 405 页看到了这部分代码:

public static List<String> findPrices(String product) {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    List<Shop> shops = Arrays.asList(new Shop(), new Shop());
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
            .collect(toList());
    return priceFutures.stream()
            .map(CompletableFuture::join).collect(toList());
}

一切正常,我可以理解这段代码,但作者在第 408 页上没有使用 thenComposeAsync 的原因我无法理解:

In general, a method without the Async suffix in its name executes its task in the same threads the previous task, whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first,so it makes no difference to the final result or to its broad-brush timing whether you compose the two CompletableFutures with one or the other variant of this method

根据我对 thenCompose(和 thenComposeAsync)签名的理解,如下所示:

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(asyncPool, fn);
}

第二个 CompletableFuture 的结果在很多情况下都取决于前面的 CompletableFuture (或者我可以说几乎总是),我们应该使用 thenCompose 而不是 thenComposeAsync 在那些情况下?

如果我们在第二个 CompletableFuture 中有阻塞代码怎么办?

这是一个类似的例子,由回答类似问题的人给出:

public CompletableFuture<String> requestData(Quote quote) {
    Request request = blockingRequestForQuote(quote);
    return CompletableFuture.supplyAsync(() -> sendRequest(request));
}

在我看来,在这种情况下使用 thenComposeAsync 可以使我们的程序更快,因为这里的 blockingRequestForQuote 可以在不同的线程上 运行。但是根据作者的意见我们不应该使用 thenComposeAsync 因为它取决于第一个 CompletableFuture 结果(即 Quote)。

我的问题是:

作者的想法是否正确,他说:

In this case, the result of the second CompletableFuture depends on the first,so it makes no difference to the final result or to its broad-brush timing whether you compose the two CompletableFutures with one or the other variant of this method

TL;DR 在这里使用 thenCompose 而不是 thenComposeAsync 是正确的,但不是出于引用的原因。通常,代码示例不应用作您自己代码的模板。


本章是 Whosebug 上反复出现的主题,出于礼貌,我们最好将其描述为“质量不足”。

In general, a method without the Async suffix in its name executes its task in the same threads the previous task, …

规范中没有关于执行线程的此类保证。 documentation 表示:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

所以也有可能任务是由“完成方法的任何其他调用者”执行的。一个直观的例子是

CompletableFuture<X> f = CompletableFuture.supplyAsync(() -> foo())
    .thenApply(f -> f.bar());

涉及两个线程。一个调用 supplyAsyncthenApply,另一个调用 foo()。如果第二个在第一个线程进入thenApply执行之前就完成了foo()的调用,有可能future已经完成

未来不记得哪个线程完成了它。它也没有一些神奇的能力来告诉该线程执行一个动作,尽管它可能正忙于其他事情,甚至从那时起就已经终止了。所以很明显,在一个已经完成的 future 上调用 thenApply 不能承诺使用完成它的线程。在大多数情况下,它会在调用 thenApply 的线程中立即执行操作。规范的措辞“完成方法的任何其他调用者”涵盖了这一点。

但这还没有结束。正如 所解释的那样,当涉及两个以上的线程时,该操作也可以由另一个线程同时调用未来的不相关完成方法来执行。这可能很少发生,但在参考实现中是可能的并且规范允许。

我们可以将其总结为:没有 Async 的方法对将执行操作甚至可能执行的线程提供 最少的控制它就在调用线程中,导致同步行为。

因此,当执行线程无关紧要并且您不希望后台线程执行时,它们是最好的,即简而言之,non-blocking 操作。

whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first, …

当你这样做时

future.thenCompose(quote ->
    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))

涉及三个个future,所以不太清楚“second”指的是哪个future。 supplyAsync 正在提交一个动作,return 正在提交一个未来。提交包含在传递给 thenCompose 的函数中,它将 return 另一个未来。

如果你在这里使用了thenComposeAsync,你只要求supplyAsync的执行必须提交给线程池,而不是直接在完成线程或“任何其他调用者”中执行完成方法”,例如直接在线程中调用 thenCompose.

关于依赖关系的推理在这里没有意义。 “then”总是暗示着依赖关系。如果你在这里使用thenComposeAsync,你就强制将动作提交到线程池,但是在future完成之前,这个提交仍然不会发生。如果 future 异常完成,则根本不会提交。

那么,在这里使用 thenCompose 合理吗?是的,但不是因为给出的原因是报价。如前所述,使用 non-async 方法意味着放弃对执行线程的控制,并且只应在线程无关紧要时使用,最值得注意的是 non-blocking 动作。调用 supplyAsync 是一个廉价的操作,它会自行将实际操作提交给线程池,因此可以在任何空闲的线程中执行它。

然而,这是一个不必要的并发症。您可以使用

实现相同的效果
future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)

将做完全相同的事情,在 future 完成后将 applyDiscount 提交给 executor,并生成代表结果的新未来。此处不需要 thenComposesupplyAsync 的组合。

请注意,这个例子已经在 中讨论过,它还解决了未来操作在多个 Stream 操作上不必要的分离以及错误的序列图。

Holger 的回答多么礼貌!他能提供如此出色的解释,同时保持不说作者明显错误的界限,给我留下了深刻的印象。我也想在这里提供我的 0.02 美元,一点点,在阅读同一本书并且不得不挠头两次之后。

首先,没有“记住”哪个线程执行了哪个阶段,规范也没有做出这样的声明(上面已经回答过)。有趣的部分甚至在上面引用的文档中:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

即使 ...完成当前的 CompletableFuture 部分也很棘手。如果有两个线程尝试在 CompletableFuture 上调用 complete,哪个线程将 运行 所有相关操作? 居然完成了?或者其他的?我写了一个 jcstress 测试,在查看结果时非常不直观:

@JCStressTest
@State
@Outcome(id = "1, 0", expect = Expect.ACCEPTABLE, desc = "executed in completion thread")
@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE, desc = "executed in the other thread")
@Outcome(id = "0, 0", expect = Expect.FORBIDDEN)
@Outcome(id = "1, 1", expect = Expect.FORBIDDEN)
public class CompletableFutureWhichThread1 {

    private final CompletableFuture<String> future = new CompletableFuture<>();

    public CompletableFutureWhichThread1() {
        future.thenApply(x -> action(Thread.currentThread().getName()));
    }

    volatile int x = -1; // different default to not mess with the expected result
    volatile int y = -1; // different default to not mess with the expected result
    volatile int actor1 = 0;
    volatile int actor2 = 0;

    private String action(String threadName) {
        System.out.println(Thread.currentThread().getName());
        // same thread that completed future, executed action
        if ("actor1".equals(threadName) && actor1 == 1) {
            x = 1;
            return "action";
        }

        // same thread that completed future, executed action
        if ("actor2".equals(threadName) && actor2 == 1) {
            x = 1;
            return "action";
        }

        y = 1;
        return "action";

    }

    @Actor
    public void actor1() {
        Thread.currentThread().setName("actor1");
        boolean completed = future.complete("done-actor1");
        if (completed) {
            actor1 = 1;
        } else {
            actor2 = 1;
        }
    }

    @Actor
    public void actor2() {
        Thread.currentThread().setName("actor2");
        boolean completed = future.complete("done-actor2");
        if (completed) {
            actor2 = 1;
        }
    }

    @Arbiter
    public void arbiter(II_Result result) {
        if (x == 1) {
            result.r1 = 1;
        }

        if (y == 1) {
            result.r2 = 1;
        }

    }

}

在 运行 之后,0, 11, 0 都可以看到。您不需要对测试本身了解太多,但它证明了一个相当有趣的观点。

您有一个 CompletableFuture future 附加了一个 future.thenApply(x -> action(...));。有两个线程(actor1actor2),它们同时相互竞争完成它(规范说只有一个会成功)。结果表明,如果actor1调用了complete,但实际上没有完成CompletableFutureactor2完成),它仍然可以在 action 中完成实际工作。换句话说,完成 CompletableFuture 的线程不一定是执行相关操作的线程(例如那些 thenApply)。这对我来说很有趣,虽然它是有道理的。


你对速度的推理有点不对。当您将工作分派到不同的线程时,您通常会为此付出代价。 thenComposethenComposeAsync 是关于能够预测 确切 的工作地点。正如您在上面看到的那样,您不能这样做,除非 您使用采用线程池的...Async 方法。您的自然问题应该是:“我为什么要关心它在哪里执行?”。

jdk's HttpClient 中有一个名为 SelectorManager 的内部 class。它(从高层次上)有一个相当简单的任务:它从套接字读取并将“响应”返回给等待 http 结果的线程。从本质上讲,这是一个唤醒所有等待某些 http 数据包的相关方的线程。现在假设这个特定线程在内部执行 thenCompose。现在也假设您的调用链如下所示:

 httpClient.sendAsync(() -> ...)
           .thenApply(x -> foo())

其中 foo 是一种永远不会完成(或需要很长时间才能完成)的方法。由于您不知道实际执行将在哪个线程中发生,因此它很可能发生在 SelectorManager 线程中。这将是一场灾难。每个人 other http 调用都会过时,因为这个线程现在很忙。因此 thenComposeAsync:如果需要,让配置的池执行 work/waiting,而 SelectorManager 线程可以自由执行其工作。

所以作者给出的理由完全错误。