将单个 CompletableFuture<OlderCat> 包装并转换为结果为 CompletableFuture<Map<Cat.name, OlderCat>> 的批量操作

Wrapping and turning a single CompleteableFuture<OlderCat> to a bulk operation with result of CompleteableFuture<Map<Cat.name, OlderCat>>

我们有一个异步方法:

public CompletableFuture<OlderCat> asyncGetOlderCat(String catName)

给定猫的列表:

List<Cat> cats;

我们想创建一个批量操作,该操作将导致猫名称与其异步结果之间的映射:

public CompletableFuture<Map<String, OlderCat>>

我们还喜欢如果 asyncGetOlderCat 抛出异常,猫将不会添加到地图中。

我们一直在关注 this post and also 并且想出了这个代码:

List<Cat> cats = ...

Map<String, CompletableFuture<OlderCat>> completableFutures = cats
            .stream()
            .collect(Collectors.toMap(Cat::getName,
                    c -> asynceGetOlderCat(c.getName())
                         .exceptionally( ex -> /* null?? */  ))
            ));


CompletableFuture<Void> allFutures = CompletableFuture
            .allOf(completableFutures.values().toArray(new CompletableFuture[completableFutures.size()]));

return allFutures.thenApply(future -> completableFutures.keySet().stream()
            .map(CompletableFuture::join) ???
            .collect(Collectors.toMap(????)));

但尚不清楚 allFutures 我们如何获取猫名以及如何在 OlderCat 和猫名之间进行匹配。

能实现吗?

据我了解,您需要的是 CompletableFuture 所有结果,下面的代码正是您所需要的

public CompletableFuture<Map<String, OlderCat>> getOlderCats(List<Cat> cats) {
    return CompletableFuture.supplyAsync(
            () -> {
                Map<String, CompletableFuture<OlderCat>> completableFutures = cats
                        .stream()
                        .collect(Collectors.toMap(Cat::getName,
                                c -> asyncGetOlderCat(c.getName())
                                        .exceptionally(ex -> {
                                            ex.printStackTrace();
                                            // if exception happens - return null
                                            // if you don't want null - save failed ones to separate list and process them separately
                                            return null;
                                        }))
                        );

                return completableFutures
                        .entrySet()
                        .stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                e -> e.getValue().join()
                        ));
            }
    );
}

它在这里做什么 - returns future,它在内部创建更多可完成的 future 并在最后等待。

你快到了。您不需要在初始期货上放置 exceptionally(),但您应该在 allOf() 之后使用 handle() 而不是 thenApply(),因为如果任何期货失败,allOf() 也会失败。

在处理futures的时候,你可以从结果中过滤掉失败的,然后重建预期的地图:

Map<String, CompletableFuture<OlderCat>> completableFutures = cats
        .stream()
        .collect(toMap(Cat::getName, c -> asyncGetOlderCat(c.getName())));

CompletableFuture<Void> allFutures = CompletableFuture
        .allOf(completableFutures.values().toArray(new CompletableFuture[0]));

return allFutures.handle((dummy, ex) ->
        completableFutures.entrySet().stream()
                .filter(entry -> !entry.getValue().isCompletedExceptionally())
                .collect(toMap(Map.Entry::getKey, e -> e.getValue().join())));

请注意,对 join() 的调用保证是 non-blocking,因为 thenApply() 只会在所有期货完成后执行。