使用 JDK8 Streams 和 CompletableFuture 调用 WebService 和 REST API
Call a WebService and a REST API using JDK8 Streams and CompletableFuture
我需要进行 SOAP 调用,然后在 REST 调用中处理来自 SOAP 调用的结果。每组调用都基于一批记录。我完全迷失了尝试使用尽可能异步的 JDK8 流将其设置为 运行。我怎样才能做到这一点?
SOAP 调用:
CompletableFuture<Stream<Product>> getProducts(final Set<String> criteria)
{
return supplyAsync(() -> {
...
return service.findProducts(request);
}, EXECUTOR_THREAD_POOL);
}
REST 调用:
final CompletableFuture<Stream<Result>> validateProducts(final Stream<Product> products)
{
return supplyAsync(() -> service
.submitProducts(products, false)
.stream(), EXECUTOR_THREAD_POOL);
}
我正在尝试调用 SOAP 调用,将结果传递给 REST 调用,并使用 JDK8 流收集结果。每个 SOAP->REST 调用都是一个 "set" 的记录(或批次),类似于分页。 (这现在完全不起作用,只是一个例子)。
@Test
public void should_execute_validations()
{
final Set<String> samples = generateSamples();
//Prepare paging...
final int total = samples.size();
final int pages = getPages(total);
log.debug("Items: {} / Pages: {}", total, pages);
final Stopwatch stopwatch = createStarted();
final Set<Result> results = range(0, pages)
.mapToObj(index -> {
final Set<String> subset = subset(index, samples);
return getProducts(subset)
.thenApply(this::validateProducts);
})
.flatMap(CompletableFuture::join)
.collect(toSet());
log.debug("Executed {} calls in {}", pages, stopwatch.stop());
assertThat(results, notNullValue());
}
我认为您的示例中有两个用法不正确:thenApply
和 join
。
要链接第一个调用 (SOAP) 和第二个调用 (REST),您需要使用 thenCompose
而不是 thenApply
。这是因为方法 "validateProducts" returns 可完成的未来,使用 "thenApply" 将在您的流映射中创建 CompletableFuture<CompletableFuture<Stream<Result>>>
。但是你需要的可能是CompletableFuture<Stream<Result>>
。使用 thenCompose
可以解决这个问题,因为它类似于 "Optional.flatMap" 或 "Stream.flatMap":
.mapToObj(index -> {
final Set<String> subset = subset(index, samples);
return getProducts(subset)
.thenCompose(this::validateProducts);
})
第二个错误的用法是join。使用 join
会阻塞当前线程等待 CompletableFuture 的结果。在您的情况下,有 N 个可完成的期货,其中 N 是页数。与其一个一个地等待它们,更好的解决方案是等待所有它们使用 CompletableFuture.allOf(...)
。此方法 return 是一个新的 CompletableFuture,它在所有给定的 CompletableFuture 完成时完成。所以我建议你修改你的流使用和 return 一个期货列表。然后,等待完成。最后,检索结果:
List<CompletableFuture<Stream<Result>>> futures = range(0, pages)
.mapToObj(index -> {
final Set<String> subset = subset(index, samples);
return getProducts(subset).thenCompose(this::validateProducts);
})
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<Stream<Result>> cf : futures) {
// TODO Handle the results and exceptions here
}
你可以在GitHub上看到完整的程序。
我需要进行 SOAP 调用,然后在 REST 调用中处理来自 SOAP 调用的结果。每组调用都基于一批记录。我完全迷失了尝试使用尽可能异步的 JDK8 流将其设置为 运行。我怎样才能做到这一点?
SOAP 调用:
CompletableFuture<Stream<Product>> getProducts(final Set<String> criteria)
{
return supplyAsync(() -> {
...
return service.findProducts(request);
}, EXECUTOR_THREAD_POOL);
}
REST 调用:
final CompletableFuture<Stream<Result>> validateProducts(final Stream<Product> products)
{
return supplyAsync(() -> service
.submitProducts(products, false)
.stream(), EXECUTOR_THREAD_POOL);
}
我正在尝试调用 SOAP 调用,将结果传递给 REST 调用,并使用 JDK8 流收集结果。每个 SOAP->REST 调用都是一个 "set" 的记录(或批次),类似于分页。 (这现在完全不起作用,只是一个例子)。
@Test
public void should_execute_validations()
{
final Set<String> samples = generateSamples();
//Prepare paging...
final int total = samples.size();
final int pages = getPages(total);
log.debug("Items: {} / Pages: {}", total, pages);
final Stopwatch stopwatch = createStarted();
final Set<Result> results = range(0, pages)
.mapToObj(index -> {
final Set<String> subset = subset(index, samples);
return getProducts(subset)
.thenApply(this::validateProducts);
})
.flatMap(CompletableFuture::join)
.collect(toSet());
log.debug("Executed {} calls in {}", pages, stopwatch.stop());
assertThat(results, notNullValue());
}
我认为您的示例中有两个用法不正确:thenApply
和 join
。
要链接第一个调用 (SOAP) 和第二个调用 (REST),您需要使用 thenCompose
而不是 thenApply
。这是因为方法 "validateProducts" returns 可完成的未来,使用 "thenApply" 将在您的流映射中创建 CompletableFuture<CompletableFuture<Stream<Result>>>
。但是你需要的可能是CompletableFuture<Stream<Result>>
。使用 thenCompose
可以解决这个问题,因为它类似于 "Optional.flatMap" 或 "Stream.flatMap":
.mapToObj(index -> {
final Set<String> subset = subset(index, samples);
return getProducts(subset)
.thenCompose(this::validateProducts);
})
第二个错误的用法是join。使用 join
会阻塞当前线程等待 CompletableFuture 的结果。在您的情况下,有 N 个可完成的期货,其中 N 是页数。与其一个一个地等待它们,更好的解决方案是等待所有它们使用 CompletableFuture.allOf(...)
。此方法 return 是一个新的 CompletableFuture,它在所有给定的 CompletableFuture 完成时完成。所以我建议你修改你的流使用和 return 一个期货列表。然后,等待完成。最后,检索结果:
List<CompletableFuture<Stream<Result>>> futures = range(0, pages)
.mapToObj(index -> {
final Set<String> subset = subset(index, samples);
return getProducts(subset).thenCompose(this::validateProducts);
})
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<Stream<Result>> cf : futures) {
// TODO Handle the results and exceptions here
}
你可以在GitHub上看到完整的程序。