SupplyAsync 等待所有 CompletableFutures 完成

SupplyAsync wait for all CompletableFutures to finish

我在下面 运行 一些异步任务,需要等到它们全部完成。我不确定为什么,但是 join() 不会强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?

CompletableFutures 列表只是一个映射 supplyAsync 的流

List<Integer> items = Arrays.asList(1, 2, 3);

List<CompletableFuture<Integer>> futures = items
                .stream()
                .map(item -> CompletableFuture.supplyAsync(() ->  {

                    System.out.println("processing");
                    // do some processing here
                    return item;

                }))
                .collect(Collectors.toList());

我等着未来的样子。

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

我可以使用 futures.forEach(CompletableFuture::join); 等待,但我想知道为什么我的流方法不起作用。

此代码:

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

是否等待futures中的所有期货完成。它所做的是创建一个新的未来,它将等待 futures 中的所有异步执行在它自己完成之前完成(但在所有这些未来完成之前不会阻塞)。当这个 allOf 未来完成时,然后你的 thenApply 代码 运行s。但是 allOf() 会立即 return 而不会阻塞。

这意味着 futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) 在您的代码中仅 运行s 所有异步执行完成后,这违背了您的目的。 join() 个调用将立即全部 return。但这不是更大的问题。您的挑战是 allOf().thenApply() 不会等待异步执行完成。它只会创造另一个不会阻塞的未来。

最简单的解决方案是使用第一个管道并映射到一个整数列表:

List<Integer> results = items.stream()
    .map(item -> CompletableFuture.supplyAsync(() -> {

        System.out.println("processing " + item);
        // do some processing here
        return item;

    }))
    .collect(Collectors.toList()) //force-submit all
    .stream()
    .map(CompletableFuture::join) //wait for each
    .collect(Collectors.toList());

如果您想使用与原始代码类似的内容,则必须将您的第二个代码段更改为:

List<Integer> reuslts = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

那是因为 CompletableFuture.allOf 不等待,它只是将所有 futures 组合成一个新的,当所有 futures 完成时完成:

Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.

或者,您仍然可以使用 allOf()join(),然后 运行 您当前的 thenApply() 代码:

//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .join(); 

//join() calls below return immediately
List<Integer> result = futures.stream()
        .map(CompletableFuture::join) 
        .collect(Collectors.toList());

join() 立即调用最后一条语句 return,因为 join() 对包装器 (allOf()) 的调用 future 将等待传递给的所有 futures它来完成。这就是为什么当您可以使用第一种方法时我看不到这样做的原因。