如何等到 List<Mono<List<Object>>> 完成?
How to wait until List<Mono<List<Object>>> finishes?
这是我第一次使用 webClient,我想知道如何等到 List> 完成。我有以下代码:
List<Address> addresses = collectAllAddresses(someObject);
List<Mono<List<AnotherAddress>>> monoResponses =
addresses.stream()
.map(address -> webClientGateway.findAddresses(userData, address.getFullAddress()))
.collect(Collectors.toList());
Mono.when(monoResponses).block();
log.info("mono responses");
monoResponses.stream()
.flatMap(it -> Objects.requireNonNull(it.block()).stream()).forEach(it -> log.info("mono responses + {}", it));
和以下 findAddresses
方法:
public Mono<List<AnotherAddress>> findAddresses(UserData userData, String fullAddress) {
if (StringUtils.isEmpty(fullAddress)) {
log.info("Address is empty that why we return Mono.just(Collections.emptyList()");
return Mono.just(Collections.emptyList());
}
return webClient.get()
.uri(path, uri -> uri.queryParam("query", fullAddress).queryParam("count", 1).build())
.header("someHeader", someHeader)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<AnotherAddress>>() {
})
.doOnError(e -> log.error("Error occurred!", e));
}
但每次我执行它时,我总是得到空对象列表,我的意思是我得到了列表,但该列表中的每个对象都是空的(class AnotherAddress 的每个字段都是空的)。有什么问题吗?
UDP:更多解释:
我有两个微服务。在一个微服务(return 另一个地址)中有一个发送另一个地址的 RestController。在另一个微服务中,我想使用 WebClient(而不是使用具有多个线程的 threadPool)从之前的微服务调用 RestController。当我以前实现函数 webClientGateway.findAddresses(userData, address.getFullAddress()) 并且它 returns Mono 我测试它并在调用函数后立即调用结果块并且它作品。但是现在我有以下情况,我有很多地址(可能是 5 个或 10 个),我想为每个地址发送异步请求并等到最新完成,然后我想做另一个操作,但不是得到完整的 AnotherAddress 实例,我我得到 5 个空的 AnotherAddress 实例(每个字段都是空的)
使用 Flux 而不是 Mono,例如像(未经测试):
public Flux<AnotherAddress> findAddresses(UserData userData, String fullAddress) {
if (StringUtils.isEmpty(fullAddress)) {
log.info("Address is empty that why we return Mono.just(Collections.emptyList()");
return Flux.empty();
}
return webClient.get()
.uri(path, uri -> uri.queryParam("query", fullAddress).queryParam("count", 1).build())
.header("someHeader", someHeader)
.retrieve()
.bodyToFlux(AnotherAddress.class)
.doOnError(e -> log.error("Error occurred!", e));
}
如果您不需要按地址分组的 AnotherAddress 列表,您可以使用类似(未测试)的内容:
Flux<AnotherAddress> anotherAddressFlux= Flux.fromIterable(addresses)
.flatMap(address -> webClientGateway.findAddresses(userData, address.getFullAddress()));
如果你想阻止你可以使用:
List<AnotherAddress> anotherAddressList = anotherAddressFlux.collectList().block();
这是我第一次使用 webClient,我想知道如何等到 List
List<Address> addresses = collectAllAddresses(someObject);
List<Mono<List<AnotherAddress>>> monoResponses =
addresses.stream()
.map(address -> webClientGateway.findAddresses(userData, address.getFullAddress()))
.collect(Collectors.toList());
Mono.when(monoResponses).block();
log.info("mono responses");
monoResponses.stream()
.flatMap(it -> Objects.requireNonNull(it.block()).stream()).forEach(it -> log.info("mono responses + {}", it));
和以下 findAddresses
方法:
public Mono<List<AnotherAddress>> findAddresses(UserData userData, String fullAddress) {
if (StringUtils.isEmpty(fullAddress)) {
log.info("Address is empty that why we return Mono.just(Collections.emptyList()");
return Mono.just(Collections.emptyList());
}
return webClient.get()
.uri(path, uri -> uri.queryParam("query", fullAddress).queryParam("count", 1).build())
.header("someHeader", someHeader)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<AnotherAddress>>() {
})
.doOnError(e -> log.error("Error occurred!", e));
}
但每次我执行它时,我总是得到空对象列表,我的意思是我得到了列表,但该列表中的每个对象都是空的(class AnotherAddress 的每个字段都是空的)。有什么问题吗?
UDP:更多解释:
我有两个微服务。在一个微服务(return 另一个地址)中有一个发送另一个地址的 RestController。在另一个微服务中,我想使用 WebClient(而不是使用具有多个线程的 threadPool)从之前的微服务调用 RestController。当我以前实现函数 webClientGateway.findAddresses(userData, address.getFullAddress()) 并且它 returns Mono 我测试它并在调用函数后立即调用结果块并且它作品。但是现在我有以下情况,我有很多地址(可能是 5 个或 10 个),我想为每个地址发送异步请求并等到最新完成,然后我想做另一个操作,但不是得到完整的 AnotherAddress 实例,我我得到 5 个空的 AnotherAddress 实例(每个字段都是空的)
使用 Flux 而不是 Mono,例如像(未经测试):
public Flux<AnotherAddress> findAddresses(UserData userData, String fullAddress) {
if (StringUtils.isEmpty(fullAddress)) {
log.info("Address is empty that why we return Mono.just(Collections.emptyList()");
return Flux.empty();
}
return webClient.get()
.uri(path, uri -> uri.queryParam("query", fullAddress).queryParam("count", 1).build())
.header("someHeader", someHeader)
.retrieve()
.bodyToFlux(AnotherAddress.class)
.doOnError(e -> log.error("Error occurred!", e));
}
如果您不需要按地址分组的 AnotherAddress 列表,您可以使用类似(未测试)的内容:
Flux<AnotherAddress> anotherAddressFlux= Flux.fromIterable(addresses)
.flatMap(address -> webClientGateway.findAddresses(userData, address.getFullAddress()));
如果你想阻止你可以使用:
List<AnotherAddress> anotherAddressList = anotherAddressFlux.collectList().block();