组合多个 CompletableFutures
Combining multiple CompletableFutures
我有以下组件:
private JobInfo aggregateJobInfo() {
final JobsResult jobsResult = restClient().getJobs();
final List<String> jobIds = extractJobIds(jobsResult);
//fetch details, exceptions and config for each job
final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> {
final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId);
final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId);
final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId);
return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
}).collect(Collectors.toList());
return new JobInfo(jobsResult, jobDetails);
}
private static List<String> extractJobIds(final JobsResult jobsResult) {
final ArrayList<String> jobIds = new ArrayList<>();
jobIds.addAll(jobsResult.getRunning());
jobIds.addAll(jobsResult.getFinished());
jobIds.addAll(jobsResult.getCanceled());
jobIds.addAll(jobsResult.getFailed());
return jobIds;
}
它只是调用了一些端点并聚合了一些数据。现在我正在尝试通过使用 CompletableFutures 实现非阻塞,我以前并没有真正使用它..
private CompletableFuture<JobInfo> aggregateJobInfo() {
final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs();
final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds);
//fetch details, exceptions and config for each job
final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture = jobIdsFuture.thenApply(jobIds -> {
return jobIds.stream().map(jobId -> {
final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId);
final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId);
final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId);
return jobDetailsResultFuture.thenCompose(jobDetailResult -> {
return jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> {
return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
});
});
}).collect(Collectors.toList());
});
return null;
我的问题是当 JobInfo 是 `new JobInfo(jobsResult, jobDetails) 时如何在这里创建 CompletableFuture?!
正如我所说,我是新手,也许我的方法不好并且有更好的解决方案?
感谢任何想法,感谢
第一个工作版本:
private CompletableFuture<JobInfo> aggregateJobInfo() {
final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs();
final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds);
final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFutureListFuture =
jobIdsFuture.thenApply(jobIds -> jobIds.stream().map(jobId -> {
final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId);
final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId);
final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId);
return jobDetailsResultFuture.thenCompose(jobDetailResult ->
jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) ->
new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult)));
}).collect(Collectors.toList()));
return jobDetailsFutureListFuture.thenCompose(jobDetailsFutures ->
CompletableFuture.allOf(jobDetailsFutures.toArray(
new CompletableFuture[jobDetailsFutures.size()])).thenApply(aVoid ->
jobDetailsFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())))
.thenApply(jobDetails -> jobsResultFuture.thenApply(jobsResult ->
new JobInfo(jobsResult, jobDetails)))
.join();
}
您有:
CompletableFuture<JobsResult> jobsResultFuture
CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture
JobInfo(JobsResult a, List<JobDetails> b)
你想要
CompletableFuture<JobInfo>
补充观察:jobDetailsFuture
只有在jobsResultFuture
完成后才能完成。
因此您可以执行以下操作:
List<CompletableFuture<JobDetails>>
-> Void
通过 allOf
在 thenCompose
Void
+ List<CompletableFuture<JobDetails>>
(作为捕获的变量)-> List<JobDetails>
通过 thenApply
List<JobDetails>
+ CompletableFuture<JobsResult>
(作为捕获的变量)-> JobInfo
通过 thenApply
您可以通过 get()
在这些映射器函数中简单地解包 futures,因为由于其祖先 futures 在该点的依赖性,futures 保证在该点完成。
使用 thenCombine
和流减少的其他方法是可能的,但更冗长并创建更多的中间期货。
我有以下组件:
private JobInfo aggregateJobInfo() {
final JobsResult jobsResult = restClient().getJobs();
final List<String> jobIds = extractJobIds(jobsResult);
//fetch details, exceptions and config for each job
final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> {
final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId);
final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId);
final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId);
return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
}).collect(Collectors.toList());
return new JobInfo(jobsResult, jobDetails);
}
private static List<String> extractJobIds(final JobsResult jobsResult) {
final ArrayList<String> jobIds = new ArrayList<>();
jobIds.addAll(jobsResult.getRunning());
jobIds.addAll(jobsResult.getFinished());
jobIds.addAll(jobsResult.getCanceled());
jobIds.addAll(jobsResult.getFailed());
return jobIds;
}
它只是调用了一些端点并聚合了一些数据。现在我正在尝试通过使用 CompletableFutures 实现非阻塞,我以前并没有真正使用它..
private CompletableFuture<JobInfo> aggregateJobInfo() {
final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs();
final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds);
//fetch details, exceptions and config for each job
final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture = jobIdsFuture.thenApply(jobIds -> {
return jobIds.stream().map(jobId -> {
final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId);
final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId);
final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId);
return jobDetailsResultFuture.thenCompose(jobDetailResult -> {
return jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> {
return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult);
});
});
}).collect(Collectors.toList());
});
return null;
我的问题是当 JobInfo 是 `new JobInfo(jobsResult, jobDetails) 时如何在这里创建 CompletableFuture?!
正如我所说,我是新手,也许我的方法不好并且有更好的解决方案?
感谢任何想法,感谢
第一个工作版本:
private CompletableFuture<JobInfo> aggregateJobInfo() {
final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs();
final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds);
final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFutureListFuture =
jobIdsFuture.thenApply(jobIds -> jobIds.stream().map(jobId -> {
final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId);
final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId);
final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId);
return jobDetailsResultFuture.thenCompose(jobDetailResult ->
jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) ->
new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult)));
}).collect(Collectors.toList()));
return jobDetailsFutureListFuture.thenCompose(jobDetailsFutures ->
CompletableFuture.allOf(jobDetailsFutures.toArray(
new CompletableFuture[jobDetailsFutures.size()])).thenApply(aVoid ->
jobDetailsFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())))
.thenApply(jobDetails -> jobsResultFuture.thenApply(jobsResult ->
new JobInfo(jobsResult, jobDetails)))
.join();
}
您有:
CompletableFuture<JobsResult> jobsResultFuture
CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture
JobInfo(JobsResult a, List<JobDetails> b)
你想要
CompletableFuture<JobInfo>
补充观察:jobDetailsFuture
只有在jobsResultFuture
完成后才能完成。
因此您可以执行以下操作:
List<CompletableFuture<JobDetails>>
->Void
通过allOf
在thenCompose
Void
+List<CompletableFuture<JobDetails>>
(作为捕获的变量)->List<JobDetails>
通过thenApply
List<JobDetails>
+CompletableFuture<JobsResult>
(作为捕获的变量)->JobInfo
通过thenApply
您可以通过 get()
在这些映射器函数中简单地解包 futures,因为由于其祖先 futures 在该点的依赖性,futures 保证在该点完成。
使用 thenCombine
和流减少的其他方法是可能的,但更冗长并创建更多的中间期货。