在具有两个 Future 的循环中使用 CompletableFuture 以在每个循环迭代中合并
Using CompletableFuture in a Loop with Two Futures to Merge per Loop Iteration
我有如下代码:
testMethod(List<String> ids) {
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
for(String id : ids) {
CompletableFuture<ResultOne> resultOne = AynchOne(id);
CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b));
resultThreeList.add(resultThree);
}
// PROCESS RESULTS HERE
}
class ResultOne {
boolean goodResult;
String id;
ResultOne(String promId) {
this.goodResult = true;
this.id = promId;
}
}
class ResultTwo {
boolean goodResult;
String id;
ResultTwo(String promId) {
this.goodResult = true;
this.id = promId;
}
class ResultThree() {
boolean goodResult;
String = id;
}
private ResultThree computeCombinedResultThree(ResultOne r1, ResultTwo r2) {
ResultThree resultThree = new ResultThree();
resultThree.id = r1.id;
resultThree.goodResult = r1.goodResult && r2.goodResult;
return resultThree;
}
,我需要能够将结果 resultOne 和 resultTwo 进行 AND 运算,这样对于每次迭代,在整个同步执行完成后,我有一个(我猜)数组或映射可以随后处理,其中数组中的一个对象具有相应的 id 和该 id 的 true 或 false(表示来自单独对象的两个布尔值的 AND-ing。
根据读者的反馈,我已经将代码完成到可以合并两个原始 future 的地步,并将每次迭代的所有结果组合起来以获得整个 futures 循环。此时我只需要处理结果即可。
我想也许我需要另一个 CompletableFuture?这个可能是这样的(放在上面我有“// PROCESS RESULTS HERE”的地方):
CompletableFuture<Void> future = resultThreeList
.thenRun(() -> forwardSuccesses(resultThreeList));
future.get();
forwardSuccesses() 将遍历 resultThreeList 将成功的 ID 转发给另一个进程,但不会起诉这是如何做到的。
感谢任何想法。谢谢
在 for 循环中,您会立即获得 return 中的 CompletableFutures。在后台发生了一些神奇的事情,你想等到两者都完成。
因此,在两个 CompletableFutures 都被 returned 之后,通过使用 long 值和时间单位调用 CompletableFuture.get 来引起阻塞等待。如果你只调用不带任何参数的 get,你将永远等待。
明智地选择超时和JDK。可能会发生,JDK 8 不提供超时获取。 JDK 8 也不再受支持。 JDK 11 现在是长期支持,最近的编译器不再提供 JDK 8 作为目标。
我强烈建议您阅读有关 CompletableFuture 的详细信息以及它与 Future 的不同之处,尤其是。关于取消之类的线程控制。不知道 CompletableFuture 的底层提供者,我还假设查询一个 ID 是一种资源浪费,而且吞吐量非常有限。但这是一个单独的问题。
这就是你到目前为止的进展:
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
CompletableFuture<ResultOne> resultOne = aynchOne(id);
CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
resultThreeList.add(resultThree);
}
现在您需要做的就是所有结果计算完成后即可完成。
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> resultThreeList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
或类似
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));
其中 safeGet
是一种只调用 future.get()
并捕获可能发生的异常的方法 - 由于这些异常,您不能只在 lambda 中调用 get()
。
现在您可以使用 thenAccept()
处理此列表:
try {
combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
同样,被捕获的异常是由于调用 get()
。
旁注,我真的不明白为什么会有三个结果 类 因为你所需要的 - 至少对于这部分代码 - 是 id 和结果状态。我会为此引入一个接口 (Result
?),并且只针对它工作。
在我看来你不需要 3 个不同的 ResultOne
ResultTwo
ResultThree
classes 因为它们定义了相同的类型,所以我将替换它们Result
.
假设您只想转发成功,我在 Result
class 中添加了一个简短的 isGoodResult()
方法,用作流的谓词:
class Result {
public boolean goodResult;
public String id;
// ...
public boolean isGoodResult() {
return this.goodResult;
}
}
我还建议摆脱循环,将其替换为流以使您的代码更流畅。
应该 forwardSuccess
是 strict,接受 List<Result>
,这就是我实现 testMethod
:
的方式
void testMethod(List<String> ids) {
final List<Result> results = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList());
// PROCESS RESULTS HERE
forwardSuccesses(results);
}
应该forwardSuccess
懒惰,接受CompletableFuture<List<Result>>
:
void testMethod(List<String> ids) {
final List<CompletableFuture<Result>> futures = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.collect(Collectors.toList());
final CompletableFuture<List<Result>> asyncResults =
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> futures
.stream()
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList()));
// PROCESS RESULTS HERE
forwardSuccessesAsync(asyncResults);
}
我有如下代码:
testMethod(List<String> ids) {
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
for(String id : ids) {
CompletableFuture<ResultOne> resultOne = AynchOne(id);
CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b));
resultThreeList.add(resultThree);
}
// PROCESS RESULTS HERE
}
class ResultOne {
boolean goodResult;
String id;
ResultOne(String promId) {
this.goodResult = true;
this.id = promId;
}
}
class ResultTwo {
boolean goodResult;
String id;
ResultTwo(String promId) {
this.goodResult = true;
this.id = promId;
}
class ResultThree() {
boolean goodResult;
String = id;
}
private ResultThree computeCombinedResultThree(ResultOne r1, ResultTwo r2) {
ResultThree resultThree = new ResultThree();
resultThree.id = r1.id;
resultThree.goodResult = r1.goodResult && r2.goodResult;
return resultThree;
}
,我需要能够将结果 resultOne 和 resultTwo 进行 AND 运算,这样对于每次迭代,在整个同步执行完成后,我有一个(我猜)数组或映射可以随后处理,其中数组中的一个对象具有相应的 id 和该 id 的 true 或 false(表示来自单独对象的两个布尔值的 AND-ing。
根据读者的反馈,我已经将代码完成到可以合并两个原始 future 的地步,并将每次迭代的所有结果组合起来以获得整个 futures 循环。此时我只需要处理结果即可。
我想也许我需要另一个 CompletableFuture?这个可能是这样的(放在上面我有“// PROCESS RESULTS HERE”的地方):
CompletableFuture<Void> future = resultThreeList
.thenRun(() -> forwardSuccesses(resultThreeList));
future.get();
forwardSuccesses() 将遍历 resultThreeList 将成功的 ID 转发给另一个进程,但不会起诉这是如何做到的。 感谢任何想法。谢谢
在 for 循环中,您会立即获得 return 中的 CompletableFutures。在后台发生了一些神奇的事情,你想等到两者都完成。
因此,在两个 CompletableFutures 都被 returned 之后,通过使用 long 值和时间单位调用 CompletableFuture.get 来引起阻塞等待。如果你只调用不带任何参数的 get,你将永远等待。
明智地选择超时和JDK。可能会发生,JDK 8 不提供超时获取。 JDK 8 也不再受支持。 JDK 11 现在是长期支持,最近的编译器不再提供 JDK 8 作为目标。
我强烈建议您阅读有关 CompletableFuture 的详细信息以及它与 Future 的不同之处,尤其是。关于取消之类的线程控制。不知道 CompletableFuture 的底层提供者,我还假设查询一个 ID 是一种资源浪费,而且吞吐量非常有限。但这是一个单独的问题。
这就是你到目前为止的进展:
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
CompletableFuture<ResultOne> resultOne = aynchOne(id);
CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
resultThreeList.add(resultThree);
}
现在您需要做的就是
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> resultThreeList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
或类似
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));
其中 safeGet
是一种只调用 future.get()
并捕获可能发生的异常的方法 - 由于这些异常,您不能只在 lambda 中调用 get()
。
现在您可以使用 thenAccept()
处理此列表:
try {
combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
同样,被捕获的异常是由于调用 get()
。
旁注,我真的不明白为什么会有三个结果 类 因为你所需要的 - 至少对于这部分代码 - 是 id 和结果状态。我会为此引入一个接口 (Result
?),并且只针对它工作。
在我看来你不需要 3 个不同的 ResultOne
ResultTwo
ResultThree
classes 因为它们定义了相同的类型,所以我将替换它们Result
.
假设您只想转发成功,我在 Result
class 中添加了一个简短的 isGoodResult()
方法,用作流的谓词:
class Result {
public boolean goodResult;
public String id;
// ...
public boolean isGoodResult() {
return this.goodResult;
}
}
我还建议摆脱循环,将其替换为流以使您的代码更流畅。
应该 forwardSuccess
是 strict,接受 List<Result>
,这就是我实现 testMethod
:
void testMethod(List<String> ids) {
final List<Result> results = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList());
// PROCESS RESULTS HERE
forwardSuccesses(results);
}
应该forwardSuccess
懒惰,接受CompletableFuture<List<Result>>
:
void testMethod(List<String> ids) {
final List<CompletableFuture<Result>> futures = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),
(r1, r2) -> computeCombinedResult(r1, r2)))
.collect(Collectors.toList());
final CompletableFuture<List<Result>> asyncResults =
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> futures
.stream()
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList()));
// PROCESS RESULTS HERE
forwardSuccessesAsync(asyncResults);
}