在 java 中,我如何处理 CompletableFutures 并获得第一个完成的理想结果?

In java, how do I process CompletableFutures and get the first desireable result that completes?

通常,对于 CompletableFuture,我会在结果可用时调用 thenApply 或其他一些方法来做某事。但是,我现在有一种情况,我想处理结果,直到收到肯定结果,然后忽略所有进一步的结果。

如果我只想获取第一个可用结果,我可以使用 CompletableFuture.anyOf(尽管我讨厌为了调用 anyOf 而必须将列表转换为数组)。但这不是我想要的。我想获取第一个结果,如果它没有理想的结果,那么我想处理第二个可用的结果,依此类推,直到我得到理想的结果。

这是一个遍历所有结果的简单示例,returns 它找到的第一个大于 9 的值。(请注意,这不是我的真正任务。这只是一个简单的示例。)

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    for(CompletableFuture<Integer> result : results) {
        Integer v = result.get();
        if(v > 9)
            return v;
    }
    return null;
}

当然,该示例从头开始检查结果,而不是在结果完成时查看结果。所以这里有一个可以完成我想要的,但是代码要复杂得多。

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    AtomicInteger finalResult = new AtomicInteger();
    CountDownLatch latch = new CountDownLatch(results.size());
    for(CompletableFuture<Integer> result : results) {
        result.whenComplete((v,e) -> {
            if(e!=null) {
                Logger.getLogger(getClass()).error("",e);
            } else if(v > 9) {
                finalResult.set(v);
                while(latch.getCount() > 0)
                    latch.countDown();
                return;
            }
            latch.countDown();
        });
    }
    latch.await();

    if(finalResult.get() > 9)
        return finalResult.get();
    return null;
}    

有没有 api 我可以这样做的地方?

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    Iterator<Integer> resultIt = getResultsAsAvailable(results);
    for(; resultIt.hasNext();) {
        Integer v = resultIt.next();
        if(v > 9)
            return v;
    }
    return null;
}

甚至更好:

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    return getFirstMatch(results, r -> {return r > 9;});
}

我不知道 JDK 或其他地方有这样的 API。您可以自己滚动。

如果 future 已经完成,您可以利用 CompletableFuture#complete(和 completeExceptionally)什么都不做的事实。

If not already completed, sets the value returned by get() and related methods to the given value.

创建一个新的最终结果 CompletableFuture。如果您的条件适用,请为您的每个尝试 complete 这个 最终结果 的期货添加一个延续。那个未来将随着第一次成功而结束。但是,如果 none 成功,您显然需要 null 作为结果。您可以使用 allOf 创建一个 CompletableFuture 来尝试 complete 使用 null 的最终结果。

类似

public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
    CompletableFuture<T> finalResult = new CompletableFuture<>();
    // attempt to complete on success
    futures.stream().forEach(future -> future.thenAccept(successResult -> {
        if (condition.test(successResult))
            finalResult.complete(successResult);
    }));
    CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    all.thenRun(() -> {
        finalResult.complete(null);
    });
    return finalResult;
}

您支付空操作调用的开销。

您可以根据需要将 null 更改为某个默认值或以不同方式处理异常(completeExceptionally 一旦发生错误)。您必须使用 whenCompletehandle 而不是上面的 thenAccept 才能访问 Exception.

您可以使用以下解决方案:

public static <T> CompletableFuture<T> anyMatch(
    List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {

    CompletableFuture<T> result=new CompletableFuture<>();
    Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };
    CompletableFuture.allOf(l.stream()
        .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))
    .whenComplete((ignored, t) ->
        result.completeExceptionally(t!=null? t: new NoSuchElementException()));
    return result;
}

基本原理与相同,但有一些不同:

  • 通用签名更灵活。
  • CompletableFuture.allOf 所必需的数组的创建与对源期货的后续操作的注册相结合。作为副作用,allOf 操作的处理程序依赖于完成结果的所有尝试的完成,而不是仅依赖于原始期货。这使得实际所需的依赖关系明确。这样,当我们将所有 thenAccept 替换为 thenAcceptAsync 时,它甚至会起作用。
  • 此解决方案以 NoSuchElementException 结束,而不是在没有结果符合条件的情况下返回 null。如果至少有一个 future 异常完成并且没有成功完成匹配结果,则中继发生的异常之一。

你可以试试

List<CompletableFuture<Integer>> list=Arrays.asList(
    CompletableFuture.supplyAsync(()->5),
    CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),
    CompletableFuture.supplyAsync(()->42),
    CompletableFuture.completedFuture(0)
);
anyMatch(list, i -> i>9)
    .thenAccept(i->System.out.println("got "+i))
    // optionally chain with:
    .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });