在具有两个 Future 的循环中使用 CompletableFuture 以在每个循环迭代中合并

Using CompletableFuture in a Loop with Two Futures to Merge per Loop Iteration

我有如下代码:

testMethod(List<String> ids)  {
    List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
    
    for(String id : ids) {
        CompletableFuture<ResultOne> resultOne = AynchOne(id);
        CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
        
    CompletableFuture<ResultThree> resultThree =  resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)); 
    
    resultThreeList.add(resultThree);
    }
    // PROCESS RESULTS HERE
}

class ResultOne {
    boolean goodResult;
    String id;

    ResultOne(String promId) {
        this.goodResult = true;
        this.id = promId;
    }
}

class ResultTwo {
    boolean goodResult;
    String id;

    ResultTwo(String promId) {
        this.goodResult = true;
        this.id = promId;
    }

class ResultThree() {
        boolean goodResult;
        String = id;
    }

private ResultThree computeCombinedResultThree(ResultOne r1,  ResultTwo r2) { 
   ResultThree resultThree = new ResultThree();
    resultThree.id = r1.id;
    resultThree.goodResult = r1.goodResult && r2.goodResult;

    return resultThree;
}

,我需要能够将结果 resultOne 和 resultTwo 进行 AND 运算,这样对于每次迭代,在整个同步执行完成后,我有一个(我猜)数组或映射可以随后处理,其中数组中的一个对象具有相应的 id 和该 id 的 true 或 false(表示来自单独对象的两个布尔值的 AND-ing。

根据读者的反馈,我已经将代码完成到可以合并两个原始 future 的地步,并将每次迭代的所有结果组合起来以获得整个 futures 循环。此时我只需要处理结果即可。

我想也许我需要另一个 CompletableFuture?这个可能是这样的(放在上面我有“// PROCESS RESULTS HERE”的地方):

CompletableFuture<Void> future = resultThreeList
  .thenRun(() -> forwardSuccesses(resultThreeList));

future.get();

forwardSuccesses() 将遍历 resultThreeList 将成功的 ID 转发给另一个进程,但不会起诉这是如何做到的。 感谢任何想法。谢谢

在 for 循环中,您会立即获得 return 中的 CompletableFutures。在后台发生了一些神奇的事情,你想等到两者都完成。

因此,在两个 CompletableFutures 都被 returned 之后,通过使用 long 值和时间单位调用 CompletableFuture.get 来引起阻塞等待。如果你只调用不带任何参数的 get,你将永远等待。

明智地选择超时和JDK。可能会发生,JDK 8 不提供超时获取。 JDK 8 也不再受支持。 JDK 11 现在是长期支持,最近的编译器不再提供 JDK 8 作为目标。

我强烈建议您阅读有关 CompletableFuture 的详细信息以及它与 Future 的不同之处,尤其是。关于取消之类的线程控制。不知道 CompletableFuture 的底层提供者,我还假设查询一个 ID 是一种资源浪费,而且吞吐量非常有限。但这是一个单独的问题。

这就是你到目前为止的进展:

List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
    CompletableFuture<ResultOne> resultOne = aynchOne(id);
    CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);

    CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
    resultThreeList.add(resultThree);
}

现在您需要做的就是所有结果计算完成后即可完成。

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
                .thenApply(v -> resultThreeList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

或类似

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));

其中 safeGet 是一种只调用 future.get() 并捕获可能发生的异常的方法 - 由于这些异常,您不能只在 lambda 中调用 get()

现在您可以使用 thenAccept() 处理此列表:

try {
    combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

同样,被捕获的异常是由于调用 get()

旁注,我真的不明白为什么会有三个结果 类 因为你所需要的 - 至少对于这部分代码 - 是 id 和结果状态。我会为此引入一个接口 (Result?),并且只针对它工作。

在我看来你不需要 3 个不同的 ResultOne ResultTwo ResultThree classes 因为它们定义了相同的类型,所以我将替换它们Result.

假设您只想转发成功,我在 Result class 中添加了一个简短的 isGoodResult() 方法,用作流的谓词:

class Result {
    public boolean goodResult;
    public String id;
// ...
    public boolean isGoodResult() {
        return this.goodResult;
    }
}

我还建议摆脱循环,将其替换为流以使您的代码更流畅。

应该 forwardSuccessstrict,接受 List<Result>,这就是我实现 testMethod:

的方式
void testMethod(List<String> ids)  {
    final List<Result> results = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .map(CompletableFuture::join)
        .filter(Result::isGoodResult)
        .collect(Collectors.toList());

    // PROCESS RESULTS HERE
    forwardSuccesses(results);
}

应该forwardSuccess懒惰,接受CompletableFuture<List<Result>>

void testMethod(List<String> ids)  {
    final List<CompletableFuture<Result>> futures = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .collect(Collectors.toList());

    final CompletableFuture<List<Result>> asyncResults =
    CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
            .thenApply(__ -> futures 
                    .stream()
                    .map(CompletableFuture::join)
                    .filter(Result::isGoodResult)
                    .collect(Collectors.toList()));

    // PROCESS RESULTS HERE
    forwardSuccessesAsync(asyncResults);
}