作者使用 thenCompose 而不是 thenComposeAsync 的理由是否正确
Is the writer's reason correct for using thenCompose and not thenComposeAsync
这个问题与这个问题不同因为我想知道作者使用thenCompose
而不是thenComposeAsync
的原因是什么。
我正在阅读 Modern Java in action,我在第 405 页看到了这部分代码:
public static List<String> findPrices(String product) {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Shop> shops = Arrays.asList(new Shop(), new Shop());
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join).collect(toList());
}
一切正常,我可以理解这段代码,但作者在第 408 页上没有使用 thenComposeAsync
的原因我无法理解:
In general, a method without the Async suffix in its name executes
its task in the same threads the previous task, whereas a method
terminating with Async always submits the succeeding task to the
thread pool, so each of the tasks can be handled by a
different thread. In this case, the result of the second
CompletableFuture depends on the first,so it makes no difference to
the final result or to its broad-brush timing whether you compose the
two CompletableFutures with one or the other variant of this method
根据我对 thenCompose
(和 thenComposeAsync
)签名的理解,如下所示:
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
第二个 CompletableFuture
的结果在很多情况下都取决于前面的 CompletableFuture
(或者我可以说几乎总是),我们应该使用 thenCompose
而不是 thenComposeAsync
在那些情况下?
如果我们在第二个 CompletableFuture
中有阻塞代码怎么办?
这是一个类似的例子,由回答类似问题的人给出:
public CompletableFuture<String> requestData(Quote quote) {
Request request = blockingRequestForQuote(quote);
return CompletableFuture.supplyAsync(() -> sendRequest(request));
}
在我看来,在这种情况下使用 thenComposeAsync
可以使我们的程序更快,因为这里的 blockingRequestForQuote
可以在不同的线程上 运行。但是根据作者的意见我们不应该使用 thenComposeAsync
因为它取决于第一个 CompletableFuture
结果(即 Quote)。
我的问题是:
作者的想法是否正确,他说:
In this case, the result of the second
CompletableFuture depends on the first,so it makes no difference to
the final result or to its broad-brush timing whether you compose the
two CompletableFutures with one or the other variant of this method
TL;DR 在这里使用 thenCompose
而不是 thenComposeAsync
是正确的,但不是出于引用的原因。通常,代码示例不应用作您自己代码的模板。
本章是 Whosebug 上反复出现的主题,出于礼貌,我们最好将其描述为“质量不足”。
In general, a method without the Async suffix in its name executes its task in the same threads the previous task, …
规范中没有关于执行线程的此类保证。 documentation 表示:
- Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
所以也有可能任务是由“完成方法的任何其他调用者”执行的。一个直观的例子是
CompletableFuture<X> f = CompletableFuture.supplyAsync(() -> foo())
.thenApply(f -> f.bar());
涉及两个线程。一个调用 supplyAsync
和 thenApply
,另一个调用 foo()
。如果第二个在第一个线程进入thenApply
执行之前就完成了foo()
的调用,有可能future已经完成
未来不记得哪个线程完成了它。它也没有一些神奇的能力来告诉该线程执行一个动作,尽管它可能正忙于其他事情,甚至从那时起就已经终止了。所以很明显,在一个已经完成的 future 上调用 thenApply
不能承诺使用完成它的线程。在大多数情况下,它会在调用 thenApply
的线程中立即执行操作。规范的措辞“完成方法的任何其他调用者”涵盖了这一点。
但这还没有结束。正如 所解释的那样,当涉及两个以上的线程时,该操作也可以由另一个线程同时调用未来的不相关完成方法来执行。这可能很少发生,但在参考实现中是可能的并且规范允许。
我们可以将其总结为:没有 Async 的方法对将执行操作甚至可能执行的线程提供 最少的控制它就在调用线程中,导致同步行为。
因此,当执行线程无关紧要并且您不希望后台线程执行时,它们是最好的,即简而言之,non-blocking 操作。
whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first, …
当你这样做时
future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))
涉及三个个future,所以不太清楚“second”指的是哪个future。 supplyAsync
正在提交一个动作,return 正在提交一个未来。提交包含在传递给 thenCompose
的函数中,它将 return 另一个未来。
如果你在这里使用了thenComposeAsync
,你只要求supplyAsync
的执行必须提交给线程池,而不是直接在完成线程或“任何其他调用者”中执行完成方法”,例如直接在线程中调用 thenCompose
.
关于依赖关系的推理在这里没有意义。 “then”总是暗示着依赖关系。如果你在这里使用thenComposeAsync
,你就强制将动作提交到线程池,但是在future
完成之前,这个提交仍然不会发生。如果 future
异常完成,则根本不会提交。
那么,在这里使用 thenCompose
合理吗?是的,但不是因为给出的原因是报价。如前所述,使用 non-async 方法意味着放弃对执行线程的控制,并且只应在线程无关紧要时使用,最值得注意的是 non-blocking 动作。调用 supplyAsync
是一个廉价的操作,它会自行将实际操作提交给线程池,因此可以在任何空闲的线程中执行它。
然而,这是一个不必要的并发症。您可以使用
实现相同的效果
future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)
将做完全相同的事情,在 future
完成后将 applyDiscount
提交给 executor
,并生成代表结果的新未来。此处不需要 thenCompose
和 supplyAsync
的组合。
请注意,这个例子已经在 中讨论过,它还解决了未来操作在多个 Stream
操作上不必要的分离以及错误的序列图。
Holger 的回答多么礼貌!他能提供如此出色的解释,同时保持不说作者明显错误的界限,给我留下了深刻的印象。我也想在这里提供我的 0.02 美元,一点点,在阅读同一本书并且不得不挠头两次之后。
首先,没有“记住”哪个线程执行了哪个阶段,规范也没有做出这样的声明(上面已经回答过)。有趣的部分甚至在上面引用的文档中:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
即使 ...完成当前的 CompletableFuture 部分也很棘手。如果有两个线程尝试在 CompletableFuture
上调用 complete
,哪个线程将 运行 所有相关操作? 居然完成了?或者其他的?我写了一个 jcstress 测试,在查看结果时非常不直观:
@JCStressTest
@State
@Outcome(id = "1, 0", expect = Expect.ACCEPTABLE, desc = "executed in completion thread")
@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE, desc = "executed in the other thread")
@Outcome(id = "0, 0", expect = Expect.FORBIDDEN)
@Outcome(id = "1, 1", expect = Expect.FORBIDDEN)
public class CompletableFutureWhichThread1 {
private final CompletableFuture<String> future = new CompletableFuture<>();
public CompletableFutureWhichThread1() {
future.thenApply(x -> action(Thread.currentThread().getName()));
}
volatile int x = -1; // different default to not mess with the expected result
volatile int y = -1; // different default to not mess with the expected result
volatile int actor1 = 0;
volatile int actor2 = 0;
private String action(String threadName) {
System.out.println(Thread.currentThread().getName());
// same thread that completed future, executed action
if ("actor1".equals(threadName) && actor1 == 1) {
x = 1;
return "action";
}
// same thread that completed future, executed action
if ("actor2".equals(threadName) && actor2 == 1) {
x = 1;
return "action";
}
y = 1;
return "action";
}
@Actor
public void actor1() {
Thread.currentThread().setName("actor1");
boolean completed = future.complete("done-actor1");
if (completed) {
actor1 = 1;
} else {
actor2 = 1;
}
}
@Actor
public void actor2() {
Thread.currentThread().setName("actor2");
boolean completed = future.complete("done-actor2");
if (completed) {
actor2 = 1;
}
}
@Arbiter
public void arbiter(II_Result result) {
if (x == 1) {
result.r1 = 1;
}
if (y == 1) {
result.r2 = 1;
}
}
}
在 运行 之后,0, 1
和 1, 0
都可以看到。您不需要对测试本身了解太多,但它证明了一个相当有趣的观点。
您有一个 CompletableFuture future
附加了一个 future.thenApply(x -> action(...));
。有两个线程(actor1
和 actor2
),它们同时相互竞争完成它(规范说只有一个会成功)。结果表明,如果actor1
调用了complete
,但实际上没有完成CompletableFuture
(actor2
完成),它仍然可以在 action
中完成实际工作。换句话说,完成 CompletableFuture
的线程不一定是执行相关操作的线程(例如那些 thenApply
)。这对我来说很有趣,虽然它是有道理的。
你对速度的推理有点不对。当您将工作分派到不同的线程时,您通常会为此付出代价。 thenCompose
与 thenComposeAsync
是关于能够预测 确切 的工作地点。正如您在上面看到的那样,您不能这样做,除非 您使用采用线程池的...Async
方法。您的自然问题应该是:“我为什么要关心它在哪里执行?”。
在 jdk's
HttpClient
中有一个名为 SelectorManager
的内部 class。它(从高层次上)有一个相当简单的任务:它从套接字读取并将“响应”返回给等待 http 结果的线程。从本质上讲,这是一个唤醒所有等待某些 http 数据包的相关方的线程。现在假设这个特定线程在内部执行 thenCompose
。现在也假设您的调用链如下所示:
httpClient.sendAsync(() -> ...)
.thenApply(x -> foo())
其中 foo
是一种永远不会完成(或需要很长时间才能完成)的方法。由于您不知道实际执行将在哪个线程中发生,因此它很可能发生在 SelectorManager
线程中。这将是一场灾难。每个人 other http 调用都会过时,因为这个线程现在很忙。因此 thenComposeAsync
:如果需要,让配置的池执行 work/waiting,而 SelectorManager
线程可以自由执行其工作。
所以作者给出的理由完全错误。
这个问题与这个问题不同thenCompose
而不是thenComposeAsync
的原因是什么。
我正在阅读 Modern Java in action,我在第 405 页看到了这部分代码:
public static List<String> findPrices(String product) {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Shop> shops = Arrays.asList(new Shop(), new Shop());
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join).collect(toList());
}
一切正常,我可以理解这段代码,但作者在第 408 页上没有使用 thenComposeAsync
的原因我无法理解:
In general, a method without the Async suffix in its name executes its task in the same threads the previous task, whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first,so it makes no difference to the final result or to its broad-brush timing whether you compose the two CompletableFutures with one or the other variant of this method
根据我对 thenCompose
(和 thenComposeAsync
)签名的理解,如下所示:
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
第二个 CompletableFuture
的结果在很多情况下都取决于前面的 CompletableFuture
(或者我可以说几乎总是),我们应该使用 thenCompose
而不是 thenComposeAsync
在那些情况下?
如果我们在第二个 CompletableFuture
中有阻塞代码怎么办?
这是一个类似的例子,由回答类似问题的人给出:
public CompletableFuture<String> requestData(Quote quote) {
Request request = blockingRequestForQuote(quote);
return CompletableFuture.supplyAsync(() -> sendRequest(request));
}
在我看来,在这种情况下使用 thenComposeAsync
可以使我们的程序更快,因为这里的 blockingRequestForQuote
可以在不同的线程上 运行。但是根据作者的意见我们不应该使用 thenComposeAsync
因为它取决于第一个 CompletableFuture
结果(即 Quote)。
我的问题是:
作者的想法是否正确,他说:
In this case, the result of the second CompletableFuture depends on the first,so it makes no difference to the final result or to its broad-brush timing whether you compose the two CompletableFutures with one or the other variant of this method
TL;DR 在这里使用 thenCompose
而不是 thenComposeAsync
是正确的,但不是出于引用的原因。通常,代码示例不应用作您自己代码的模板。
本章是 Whosebug 上反复出现的主题,出于礼貌,我们最好将其描述为“质量不足”。
In general, a method without the Async suffix in its name executes its task in the same threads the previous task, …
规范中没有关于执行线程的此类保证。 documentation 表示:
- Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
所以也有可能任务是由“完成方法的任何其他调用者”执行的。一个直观的例子是
CompletableFuture<X> f = CompletableFuture.supplyAsync(() -> foo())
.thenApply(f -> f.bar());
涉及两个线程。一个调用 supplyAsync
和 thenApply
,另一个调用 foo()
。如果第二个在第一个线程进入thenApply
执行之前就完成了foo()
的调用,有可能future已经完成
未来不记得哪个线程完成了它。它也没有一些神奇的能力来告诉该线程执行一个动作,尽管它可能正忙于其他事情,甚至从那时起就已经终止了。所以很明显,在一个已经完成的 future 上调用 thenApply
不能承诺使用完成它的线程。在大多数情况下,它会在调用 thenApply
的线程中立即执行操作。规范的措辞“完成方法的任何其他调用者”涵盖了这一点。
但这还没有结束。正如
我们可以将其总结为:没有 Async 的方法对将执行操作甚至可能执行的线程提供 最少的控制它就在调用线程中,导致同步行为。
因此,当执行线程无关紧要并且您不希望后台线程执行时,它们是最好的,即简而言之,non-blocking 操作。
whereas a method terminating with Async always submits the succeeding task to the thread pool, so each of the tasks can be handled by a different thread. In this case, the result of the second CompletableFuture depends on the first, …
当你这样做时
future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))
涉及三个个future,所以不太清楚“second”指的是哪个future。 supplyAsync
正在提交一个动作,return 正在提交一个未来。提交包含在传递给 thenCompose
的函数中,它将 return 另一个未来。
如果你在这里使用了thenComposeAsync
,你只要求supplyAsync
的执行必须提交给线程池,而不是直接在完成线程或“任何其他调用者”中执行完成方法”,例如直接在线程中调用 thenCompose
.
关于依赖关系的推理在这里没有意义。 “then”总是暗示着依赖关系。如果你在这里使用thenComposeAsync
,你就强制将动作提交到线程池,但是在future
完成之前,这个提交仍然不会发生。如果 future
异常完成,则根本不会提交。
那么,在这里使用 thenCompose
合理吗?是的,但不是因为给出的原因是报价。如前所述,使用 non-async 方法意味着放弃对执行线程的控制,并且只应在线程无关紧要时使用,最值得注意的是 non-blocking 动作。调用 supplyAsync
是一个廉价的操作,它会自行将实际操作提交给线程池,因此可以在任何空闲的线程中执行它。
然而,这是一个不必要的并发症。您可以使用
实现相同的效果future.thenApplyAsync(quote -> Discount.applyDiscount(quote), executor)
将做完全相同的事情,在 future
完成后将 applyDiscount
提交给 executor
,并生成代表结果的新未来。此处不需要 thenCompose
和 supplyAsync
的组合。
请注意,这个例子已经在 Stream
操作上不必要的分离以及错误的序列图。
Holger 的回答多么礼貌!他能提供如此出色的解释,同时保持不说作者明显错误的界限,给我留下了深刻的印象。我也想在这里提供我的 0.02 美元,一点点,在阅读同一本书并且不得不挠头两次之后。
首先,没有“记住”哪个线程执行了哪个阶段,规范也没有做出这样的声明(上面已经回答过)。有趣的部分甚至在上面引用的文档中:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
即使 ...完成当前的 CompletableFuture 部分也很棘手。如果有两个线程尝试在 CompletableFuture
上调用 complete
,哪个线程将 运行 所有相关操作? 居然完成了?或者其他的?我写了一个 jcstress 测试,在查看结果时非常不直观:
@JCStressTest
@State
@Outcome(id = "1, 0", expect = Expect.ACCEPTABLE, desc = "executed in completion thread")
@Outcome(id = "0, 1", expect = Expect.ACCEPTABLE, desc = "executed in the other thread")
@Outcome(id = "0, 0", expect = Expect.FORBIDDEN)
@Outcome(id = "1, 1", expect = Expect.FORBIDDEN)
public class CompletableFutureWhichThread1 {
private final CompletableFuture<String> future = new CompletableFuture<>();
public CompletableFutureWhichThread1() {
future.thenApply(x -> action(Thread.currentThread().getName()));
}
volatile int x = -1; // different default to not mess with the expected result
volatile int y = -1; // different default to not mess with the expected result
volatile int actor1 = 0;
volatile int actor2 = 0;
private String action(String threadName) {
System.out.println(Thread.currentThread().getName());
// same thread that completed future, executed action
if ("actor1".equals(threadName) && actor1 == 1) {
x = 1;
return "action";
}
// same thread that completed future, executed action
if ("actor2".equals(threadName) && actor2 == 1) {
x = 1;
return "action";
}
y = 1;
return "action";
}
@Actor
public void actor1() {
Thread.currentThread().setName("actor1");
boolean completed = future.complete("done-actor1");
if (completed) {
actor1 = 1;
} else {
actor2 = 1;
}
}
@Actor
public void actor2() {
Thread.currentThread().setName("actor2");
boolean completed = future.complete("done-actor2");
if (completed) {
actor2 = 1;
}
}
@Arbiter
public void arbiter(II_Result result) {
if (x == 1) {
result.r1 = 1;
}
if (y == 1) {
result.r2 = 1;
}
}
}
在 运行 之后,0, 1
和 1, 0
都可以看到。您不需要对测试本身了解太多,但它证明了一个相当有趣的观点。
您有一个 CompletableFuture future
附加了一个 future.thenApply(x -> action(...));
。有两个线程(actor1
和 actor2
),它们同时相互竞争完成它(规范说只有一个会成功)。结果表明,如果actor1
调用了complete
,但实际上没有完成CompletableFuture
(actor2
完成),它仍然可以在 action
中完成实际工作。换句话说,完成 CompletableFuture
的线程不一定是执行相关操作的线程(例如那些 thenApply
)。这对我来说很有趣,虽然它是有道理的。
你对速度的推理有点不对。当您将工作分派到不同的线程时,您通常会为此付出代价。 thenCompose
与 thenComposeAsync
是关于能够预测 确切 的工作地点。正如您在上面看到的那样,您不能这样做,除非 您使用采用线程池的...Async
方法。您的自然问题应该是:“我为什么要关心它在哪里执行?”。
在 jdk's
HttpClient
中有一个名为 SelectorManager
的内部 class。它(从高层次上)有一个相当简单的任务:它从套接字读取并将“响应”返回给等待 http 结果的线程。从本质上讲,这是一个唤醒所有等待某些 http 数据包的相关方的线程。现在假设这个特定线程在内部执行 thenCompose
。现在也假设您的调用链如下所示:
httpClient.sendAsync(() -> ...)
.thenApply(x -> foo())
其中 foo
是一种永远不会完成(或需要很长时间才能完成)的方法。由于您不知道实际执行将在哪个线程中发生,因此它很可能发生在 SelectorManager
线程中。这将是一场灾难。每个人 other http 调用都会过时,因为这个线程现在很忙。因此 thenComposeAsync
:如果需要,让配置的池执行 work/waiting,而 SelectorManager
线程可以自由执行其工作。
所以作者给出的理由完全错误。