使用 JDK8 Streams 和 CompletableFuture 调用 WebService 和 REST API

Call a WebService and a REST API using JDK8 Streams and CompletableFuture

我需要进行 SOAP 调用,然后在 REST 调用中处理来自 SOAP 调用的结果。每组调用都基于一批记录。我完全迷失了尝试使用尽可能异步的 JDK8 流将其设置为 运行。我怎样才能做到这一点?

SOAP 调用:

CompletableFuture<Stream<Product>> getProducts(final Set<String> criteria)
{
    return supplyAsync(() -> {
        ...
        return service.findProducts(request);
    }, EXECUTOR_THREAD_POOL);
}

REST 调用:

final CompletableFuture<Stream<Result>> validateProducts(final Stream<Product> products)
{
    return supplyAsync(() -> service
        .submitProducts(products, false)
        .stream(), EXECUTOR_THREAD_POOL);
}

我正在尝试调用 SOAP 调用,将结果传递给 REST 调用,并使用 JDK8 流收集结果。每个 SOAP->REST 调用都是一个 "set" 的记录(或批次),类似于分页。 (这现在完全不起作用,只是一个例子)。

@Test
public void should_execute_validations()
{
    final Set<String> samples = generateSamples();

    //Prepare paging...
    final int total = samples.size();
    final int pages = getPages(total);
    log.debug("Items: {} / Pages: {}", total, pages);

    final Stopwatch stopwatch = createStarted();
    final Set<Result> results = range(0, pages)
        .mapToObj(index -> {
            final Set<String> subset = subset(index, samples);
            return getProducts(subset)
                .thenApply(this::validateProducts);
        })
        .flatMap(CompletableFuture::join)
        .collect(toSet());
    log.debug("Executed {} calls in {}", pages, stopwatch.stop());
    assertThat(results, notNullValue());
}

我认为您的示例中有两个用法不正确:thenApplyjoin

要链接第一个调用 (SOAP) 和第二个调用 (REST),您需要使用 thenCompose 而不是 thenApply。这是因为方法 "validateProducts" returns 可完成的未来,使用 "thenApply" 将在您的流映射中创建 CompletableFuture<CompletableFuture<Stream<Result>>>。但是你需要的可能是CompletableFuture<Stream<Result>>。使用 thenCompose 可以解决这个问题,因为它类似于 "Optional.flatMap" 或 "Stream.flatMap":

.mapToObj(index -> {
    final Set<String> subset = subset(index, samples);
    return getProducts(subset)
        .thenCompose(this::validateProducts);
})

第二个错误的用法是join。使用 join 会阻塞当前线程等待 CompletableFuture 的结果。在您的情况下,有 N 个可完成的期货,其中 N 是页数。与其一个一个地等待它们,更好的解决方案是等待所有它们使用 CompletableFuture.allOf(...)。此方法 return 是一个新的 CompletableFuture,它在所有给定的 CompletableFuture 完成时完成。所以我建议你修改你的流使用和 return 一个期货列表。然后,等待完成。最后,检索结果:

List<CompletableFuture<Stream<Result>>> futures = range(0, pages)
    .mapToObj(index -> {
        final Set<String> subset = subset(index, samples);
        return getProducts(subset).thenCompose(this::validateProducts);
    })
    .collect(Collectors.toList());

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

for (CompletableFuture<Stream<Result>> cf : futures) {
  // TODO Handle the results and exceptions here
}

你可以在GitHub上看到完整的程序。