使用 WebFlux 对特定映射的并行 GET 请求

Parallel GET Request to specific mapping with WebFlux

我想和WebClient同时调用独立请求。我之前使用 RestTemplate 的方法在等待响应时阻塞了我的线程。所以我发现,WebClientParallelFlux 可以更有效地使用一个线程,因为它应该用一个线程调度多个请求。

我的端点请求 idlocation 的元组。

fooFlux 方法将在不同参数的循环中被调用几千次。返回的地图将根据存储的参考值进行断言。

此代码的先前尝试导致重复 API 调用。 但仍有一个缺陷。 mapping 的键集大小通常小于 Set<String> location 的大小。事实上,生成的地图的大小正在发生变化。此外,它时不时地是正确的。因此,在方法返回地图后,下标完成可能存在问题。

public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
    Map<String, ServiceDescription> mapping = new HashMap<>();
    Flux.fromIterable(locations).parallel().runOn(Schedulers.boundedElastic()).flatMap(location -> {
        Mono<ServiceDescription> sdMono = getServiceDescription(id, location);
        Mono<Mono<ServiceDescription>> sdMonoMono = sdMono.flatMap(item -> {
            mapping.put(location, item);
            return Mono.just(sdMono);
        });
        return sdMonoMono;
    }).then().block();
    LOGGER.debug("Input Location size: {}", locations.size());
    LOGGER.debug("Output Location in map: {}", mapping.keySet().size());
    return mapping;
}

处理获取请求

private Mono<ServiceDescription> getServiceDescription(String id, String location) {
    String uri = URL_BASE.concat(location).concat("/detail?q=").concat(id);
    Mono<ServiceDescription> serviceDescription =
                    webClient.get().uri(uri).retrieve().onStatus(HttpStatus::isError, clientResponse -> {
                        LOGGER.error("Error while calling endpoint {} with status code {}", uri,
                                        clientResponse.statusCode());
                        throw new RuntimeException("Error while calling Endpoint");
                    }).bodyToMono(ServiceDescription.class).retryBackoff(5, Duration.ofSeconds(15));
    return serviceDescription;
}

响应式代码在您订阅生产者时执行。 Block 确实订阅了,因为你调用了 block 两次(一次在 Mono 上,但是 return 再次调用 Mono,然后在 ParallelFlux 上调用 block),Mono 被执行了两次。

    List<String> resultList = listMono.block();
    mapping.put(location, resultList);
    return listMono;

试试下面的方法(未经测试):

    listMono.map(resultList -> {
       mapping.put(location, resultList);
       return Mono.just(listMono);
    });

就是说,响应式编程模型非常复杂,因此考虑使用 @AsyncFuture/AsyncResult,如果这只是关于调用并行,正如其他人所建议的那样。 您仍然可以使用 WebClientRestTemplate 似乎即将被弃用),但只需在 bodyToMono.

之后立即调用 block
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
    return Flux.fromIterable(locations)
               .flatMap(location -> getServiceDescription(id, location).map(sd -> Tuples.of(location, sd)))
               .collectMap(Tuple2::getT1, Tuple2::getT2)
               .block();
}

注意:flatMap 运算符与 WebClient 调用相结合可实现并发执行,因此无需使用 ParallelFlux 或任何 Scheduler.