作者使用 thenCompose 而不是 thenComposeAsync 的理由是否正确

Is the writer's reason correct for using thenCompose and not thenComposeAsync


我正在阅读 Modern Java in action,我在第 405 页看到了这部分代码:

public static List<String> findPrices(String product) {
    ExecutorService executor = Executors.newFixedThreadPool(10);
    List<Shop> shops = Arrays.asList(new Shop(), new Shop());
    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
    return priceFutures.stream()

一切正常,我可以理解这段代码,但作者在第 408 页上没有使用 thenComposeAsync 的原因我无法理解:

In general, a method without the Async suffix in its name executes its task in the same threads the previous task, whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first,so it makes no difference to the final result or to its broad-brush timing whether you compose the two CompletableFutures with one or the other variant of this method

根据我对 thenCompose(和 thenComposeAsync)签名的理解,如下所示:

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);

public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(asyncPool, fn);

第二个 CompletableFuture 的结果在很多情况下都取决于前面的 CompletableFuture (或者我可以说几乎总是),我们应该使用 thenCompose 而不是 thenComposeAsync 在那些情况下?

如果我们在第二个 CompletableFuture 中有阻塞代码怎么办?


public CompletableFuture<String> requestData(Quote quote) {
    Request request = blockingRequestForQuote(quote);
    return CompletableFuture.supplyAsync(() -> sendRequest(request));

在我看来,在这种情况下使用 thenComposeAsync 可以使我们的程序更快,因为这里的 blockingRequestForQuote 可以在不同的线程上 运行。但是根据作者的意见我们不应该使用 thenComposeAsync 因为它取决于第一个 CompletableFuture 结果(即 Quote)。



In this case, the result of the second CompletableFuture depends on the first,so it makes no difference to the final result or to its broad-brush timing whether you compose the two CompletableFutures with one or the other variant of this method

TL;DR 在这里使用 thenCompose 而不是 thenComposeAsync 是正确的,但不是出于引用的原因。通常,代码示例不应用作您自己代码的模板。

本章是 Whosebug 上反复出现的主题,出于礼貌,我们最好将其描述为“质量不足”。

In general, a method without the Async suffix in its name executes its task in the same threads the previous task, …

规范中没有关于执行线程的此类保证。 documentation 表示:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.


CompletableFuture<X> f = CompletableFuture.supplyAsync(() -> foo())
    .thenApply(f -> f.bar());

涉及两个线程。一个调用 supplyAsyncthenApply,另一个调用 foo()。如果第二个在第一个线程进入thenApply执行之前就完成了foo()的调用,有可能future已经完成

未来不记得哪个线程完成了它。它也没有一些神奇的能力来告诉该线程执行一个动作,尽管它可能正忙于其他事情,甚至从那时起就已经终止了。所以很明显,在一个已经完成的 future 上调用 thenApply 不能承诺使用完成它的线程。在大多数情况下,它会在调用 thenApply 的线程中立即执行操作。规范的措辞“完成方法的任何其他调用者”涵盖了这一点。

但这还没有结束。正如 所解释的那样,当涉及两个以上的线程时,该操作也可以由另一个线程同时调用未来的不相关完成方法来执行。这可能很少发生,但在参考实现中是可能的并且规范允许。

我们可以将其总结为:没有 Async 的方法对将执行操作甚至可能执行的线程提供 最少的控制它就在调用线程中,导致同步行为。

因此,当执行线程无关紧要并且您不希望后台线程执行时,它们是最好的,即简而言之,non-blocking 操作。

whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first, …


future.thenCompose(quote ->
    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))

涉及三个个future,所以不太清楚“second”指的是哪个future。 supplyAsync 正在提交一个动作,return 正在提交一个未来。提交包含在传递给 thenCompose 的函数中,它将 return 另一个未来。

如果你在这里使用了thenComposeAsync,你只要求supplyAsync的执行必须提交给线程池,而不是直接在完成线程或“任何其他调用者”中执行完成方法”,例如直接在线程中调用 thenCompose.

关于依赖关系的推理在这里没有意义。 “then”总是暗示着依赖关系。如果你在这里使用thenComposeAsync,你就强制将动作提交到线程池,但是在future完成之前,这个提交仍然不会发生。如果 future 异常完成,则根本不会提交。

那么,在这里使用 thenCompose 合理吗?是的,但不是因为给出的原因是报价。如前所述,使用 non-async 方法意味着放弃对执行线程的控制,并且只应在线程无关紧要时使用,最值得注意的是 non-blocking 动作。调用 supplyAsync 是一个廉价的操作,它会自行将实际操作提交给线程池,因此可以在任何空闲的线程中执行它。


future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)

将做完全相同的事情,在 future 完成后将 applyDiscount 提交给 executor,并生成代表结果的新未来。此处不需要 thenComposesupplyAsync 的组合。

请注意,这个例子已经在 中讨论过,它还解决了未来操作在多个 Stream 操作上不必要的分离以及错误的序列图。

Holger 的回答多么礼貌!他能提供如此出色的解释,同时保持不说作者明显错误的界限,给我留下了深刻的印象。我也想在这里提供我的 0.02 美元,一点点,在阅读同一本书并且不得不挠头两次之后。


Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

即使 ...完成当前的 CompletableFuture 部分也很棘手。如果有两个线程尝试在 CompletableFuture 上调用 complete,哪个线程将 运行 所有相关操作? 居然完成了?或者其他的?我写了一个 jcstress 测试,在查看结果时非常不直观:

@Outcome(id = "1, 0", expect = Expect.ACCEPTABLE, desc = "executed in completion thread")
@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE, desc = "executed in the other thread")
@Outcome(id = "0, 0", expect = Expect.FORBIDDEN)
@Outcome(id = "1, 1", expect = Expect.FORBIDDEN)
public class CompletableFutureWhichThread1 {

    private final CompletableFuture<String> future = new CompletableFuture<>();

    public CompletableFutureWhichThread1() {
        future.thenApply(x -> action(Thread.currentThread().getName()));

    volatile int x = -1; // different default to not mess with the expected result
    volatile int y = -1; // different default to not mess with the expected result
    volatile int actor1 = 0;
    volatile int actor2 = 0;

    private String action(String threadName) {
        // same thread that completed future, executed action
        if ("actor1".equals(threadName) && actor1 == 1) {
            x = 1;
            return "action";

        // same thread that completed future, executed action
        if ("actor2".equals(threadName) && actor2 == 1) {
            x = 1;
            return "action";

        y = 1;
        return "action";


    public void actor1() {
        boolean completed = future.complete("done-actor1");
        if (completed) {
            actor1 = 1;
        } else {
            actor2 = 1;

    public void actor2() {
        boolean completed = future.complete("done-actor2");
        if (completed) {
            actor2 = 1;

    public void arbiter(II_Result result) {
        if (x == 1) {
            result.r1 = 1;

        if (y == 1) {
            result.r2 = 1;



在 运行 之后,0, 11, 0 都可以看到。您不需要对测试本身了解太多,但它证明了一个相当有趣的观点。

您有一个 CompletableFuture future 附加了一个 future.thenApply(x -> action(...));。有两个线程(actor1actor2),它们同时相互竞争完成它(规范说只有一个会成功)。结果表明,如果actor1调用了complete,但实际上没有完成CompletableFutureactor2完成),它仍然可以在 action 中完成实际工作。换句话说,完成 CompletableFuture 的线程不一定是执行相关操作的线程(例如那些 thenApply)。这对我来说很有趣,虽然它是有道理的。

你对速度的推理有点不对。当您将工作分派到不同的线程时,您通常会为此付出代价。 thenComposethenComposeAsync 是关于能够预测 确切 的工作地点。正如您在上面看到的那样,您不能这样做,除非 您使用采用线程池的...Async 方法。您的自然问题应该是:“我为什么要关心它在哪里执行?”。

jdk's HttpClient 中有一个名为 SelectorManager 的内部 class。它(从高层次上)有一个相当简单的任务:它从套接字读取并将“响应”返回给等待 http 结果的线程。从本质上讲,这是一个唤醒所有等待某些 http 数据包的相关方的线程。现在假设这个特定线程在内部执行 thenCompose。现在也假设您的调用链如下所示:

 httpClient.sendAsync(() -> ...)
           .thenApply(x -> foo())

其中 foo 是一种永远不会完成(或需要很长时间才能完成)的方法。由于您不知道实际执行将在哪个线程中发生,因此它很可能发生在 SelectorManager 线程中。这将是一场灾难。每个人 other http 调用都会过时,因为这个线程现在很忙。因此 thenComposeAsync:如果需要,让配置的池执行 work/waiting,而 SelectorManager 线程可以自由执行其工作。
