使用 WebFlux 对特定映射的并行 GET 请求
Parallel GET Request to specific mapping with WebFlux
我想和WebClient
同时调用独立请求。我之前使用 RestTemplate
的方法在等待响应时阻塞了我的线程。所以我发现,WebClient
和 ParallelFlux
可以更有效地使用一个线程,因为它应该用一个线程调度多个请求。
我的端点请求 id
和 location
的元组。
fooFlux
方法将在不同参数的循环中被调用几千次。返回的地图将根据存储的参考值进行断言。
此代码的先前尝试导致重复 API 调用。
但仍有一个缺陷。 mapping
的键集大小通常小于 Set<String> location
的大小。事实上,生成的地图的大小正在发生变化。此外,它时不时地是正确的。因此,在方法返回地图后,下标完成可能存在问题。
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
Map<String, ServiceDescription> mapping = new HashMap<>();
Flux.fromIterable(locations).parallel().runOn(Schedulers.boundedElastic()).flatMap(location -> {
Mono<ServiceDescription> sdMono = getServiceDescription(id, location);
Mono<Mono<ServiceDescription>> sdMonoMono = sdMono.flatMap(item -> {
mapping.put(location, item);
return Mono.just(sdMono);
});
return sdMonoMono;
}).then().block();
LOGGER.debug("Input Location size: {}", locations.size());
LOGGER.debug("Output Location in map: {}", mapping.keySet().size());
return mapping;
}
处理获取请求
private Mono<ServiceDescription> getServiceDescription(String id, String location) {
String uri = URL_BASE.concat(location).concat("/detail?q=").concat(id);
Mono<ServiceDescription> serviceDescription =
webClient.get().uri(uri).retrieve().onStatus(HttpStatus::isError, clientResponse -> {
LOGGER.error("Error while calling endpoint {} with status code {}", uri,
clientResponse.statusCode());
throw new RuntimeException("Error while calling Endpoint");
}).bodyToMono(ServiceDescription.class).retryBackoff(5, Duration.ofSeconds(15));
return serviceDescription;
}
响应式代码在您订阅生产者时执行。
Block 确实订阅了,因为你调用了 block 两次(一次在 Mono 上,但是 return 再次调用 Mono,然后在 ParallelFlux 上调用 block),Mono 被执行了两次。
List<String> resultList = listMono.block();
mapping.put(location, resultList);
return listMono;
试试下面的方法(未经测试):
listMono.map(resultList -> {
mapping.put(location, resultList);
return Mono.just(listMono);
});
就是说,响应式编程模型非常复杂,因此考虑使用 @Async
和 Future
/AsyncResult
,如果这只是关于调用并行,正如其他人所建议的那样。
您仍然可以使用 WebClient
(RestTemplate
似乎即将被弃用),但只需在 bodyToMono
.
之后立即调用 block
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
return Flux.fromIterable(locations)
.flatMap(location -> getServiceDescription(id, location).map(sd -> Tuples.of(location, sd)))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.block();
}
注意:flatMap
运算符与 WebClient
调用相结合可实现并发执行,因此无需使用 ParallelFlux
或任何 Scheduler
.
我想和WebClient
同时调用独立请求。我之前使用 RestTemplate
的方法在等待响应时阻塞了我的线程。所以我发现,WebClient
和 ParallelFlux
可以更有效地使用一个线程,因为它应该用一个线程调度多个请求。
我的端点请求 id
和 location
的元组。
fooFlux
方法将在不同参数的循环中被调用几千次。返回的地图将根据存储的参考值进行断言。
此代码的先前尝试导致重复 API 调用。
但仍有一个缺陷。 mapping
的键集大小通常小于 Set<String> location
的大小。事实上,生成的地图的大小正在发生变化。此外,它时不时地是正确的。因此,在方法返回地图后,下标完成可能存在问题。
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
Map<String, ServiceDescription> mapping = new HashMap<>();
Flux.fromIterable(locations).parallel().runOn(Schedulers.boundedElastic()).flatMap(location -> {
Mono<ServiceDescription> sdMono = getServiceDescription(id, location);
Mono<Mono<ServiceDescription>> sdMonoMono = sdMono.flatMap(item -> {
mapping.put(location, item);
return Mono.just(sdMono);
});
return sdMonoMono;
}).then().block();
LOGGER.debug("Input Location size: {}", locations.size());
LOGGER.debug("Output Location in map: {}", mapping.keySet().size());
return mapping;
}
处理获取请求
private Mono<ServiceDescription> getServiceDescription(String id, String location) {
String uri = URL_BASE.concat(location).concat("/detail?q=").concat(id);
Mono<ServiceDescription> serviceDescription =
webClient.get().uri(uri).retrieve().onStatus(HttpStatus::isError, clientResponse -> {
LOGGER.error("Error while calling endpoint {} with status code {}", uri,
clientResponse.statusCode());
throw new RuntimeException("Error while calling Endpoint");
}).bodyToMono(ServiceDescription.class).retryBackoff(5, Duration.ofSeconds(15));
return serviceDescription;
}
响应式代码在您订阅生产者时执行。 Block 确实订阅了,因为你调用了 block 两次(一次在 Mono 上,但是 return 再次调用 Mono,然后在 ParallelFlux 上调用 block),Mono 被执行了两次。
List<String> resultList = listMono.block();
mapping.put(location, resultList);
return listMono;
试试下面的方法(未经测试):
listMono.map(resultList -> {
mapping.put(location, resultList);
return Mono.just(listMono);
});
就是说,响应式编程模型非常复杂,因此考虑使用 @Async
和 Future
/AsyncResult
,如果这只是关于调用并行,正如其他人所建议的那样。
您仍然可以使用 WebClient
(RestTemplate
似乎即将被弃用),但只需在 bodyToMono
.
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
return Flux.fromIterable(locations)
.flatMap(location -> getServiceDescription(id, location).map(sd -> Tuples.of(location, sd)))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.block();
}
注意:flatMap
运算符与 WebClient
调用相结合可实现并发执行,因此无需使用 ParallelFlux
或任何 Scheduler
.