如何并行调用多个 Spring Webclient 并等待结果?
How to make multiple Spring Webclient calls in parallel and wait for the result?
我是响应式编程的新手,我想并行进行两次 API 调用并处理结果,return 一个简单的数组或项目列表。
我有两个函数,一个 return 是一个 Flux,另一个 return 是一个 Mono,我根据 Mono 的结果对 Flux 发出的项目做了一个非常简单的过滤逻辑。
我尝试使用 zipWith
,但无论采用何种过滤逻辑,都只有一项到达了最后。我也尝试过 block
但在控制器内部不允许这样做:/
@GetMapping("/{id}/offers")
fun viewTaskOffers(
@PathVariable("id") id: String,
@AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
data class TaskOfferPair(
val task: TaskDTO,
val offer: ViewOfferDTO
)
return client.getTaskOffers(id).map {
it.toViewOfferDTO()
}.zipWith(client.getTask(id), BiFunction {
offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
}).filter {
it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
}.map {
it.offer
}
}
getTaskOffers
return 的通量为 OfferDTO
getTask
return 是 TaskDTO
的单声道
如果您不能回答我的问题,请至少告诉我如何并行执行多个 API 调用并在 WebClient
中等待结果
正如您已经知道的那样,zipWith
不会帮助您,因为它会产生 min(a.size, b.size)
,它将始终为 1,以防其中之一是 Mono
.
但由于这两者是独立的,您可以简单地将它们拆分:
val task: Mono<TaskDTO> = client.getTask(id)
val result: Flux<ViewOfferDTO> =
task.flatMapMany {t ->
client.getTaskOffers(id).map {offer ->
t to offer
}
}.filter {
it.second.workerUser.id == user.id || it.first.creatorUser == user.id
}.map {
it.second
}
注意,如果你想要一对元素,你可以使用built-in Pair
。
此外,此检查没有多大意义,因为您只有 Mono
:it.first.creatorUser
这是一个并行调用的用例。
public Mono<UserInfo> fetchCarrierUserInfo(User user) {
Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());
return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
UserInfo userInfo = tuple.getT1();
userInfo.setCarrier(tuple.getT2());
return userInfo;
});
}
这里:
fetchUserInfo
进行 http 调用以从另一服务获取用户信息,并且 returns Mono
fetchCarrierInfo
方法进行 HTTP 调用以从另一个服务获取 carrierInfo 并且 returns Mono
Mono.zip()
将给定的 monos 合并到一个新的 Mono 中,当所有给定的 Monos 都产生了一个项目时,它将被实现,将它们的值聚合到一个 Tuple2 中。
然后调用fetchCarrierUserInfo().block()
得到最终结果
使用 repeat() 将您的 Mono 转换为 Flux:
client.getTask(id).cache().repeat();
所以你的代码会变成
return client.getTaskOffers(id).map {
it.toViewOfferDTO()
}.zipWith(client.getTask(id).cache().repeat(), BiFunction {
offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
}).filter {
it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
}.map {
it.offer
}
我是响应式编程的新手,我想并行进行两次 API 调用并处理结果,return 一个简单的数组或项目列表。
我有两个函数,一个 return 是一个 Flux,另一个 return 是一个 Mono,我根据 Mono 的结果对 Flux 发出的项目做了一个非常简单的过滤逻辑。
我尝试使用 zipWith
,但无论采用何种过滤逻辑,都只有一项到达了最后。我也尝试过 block
但在控制器内部不允许这样做:/
@GetMapping("/{id}/offers")
fun viewTaskOffers(
@PathVariable("id") id: String,
@AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
data class TaskOfferPair(
val task: TaskDTO,
val offer: ViewOfferDTO
)
return client.getTaskOffers(id).map {
it.toViewOfferDTO()
}.zipWith(client.getTask(id), BiFunction {
offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
}).filter {
it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
}.map {
it.offer
}
}
getTaskOffers
return 的通量为OfferDTO
getTask
return 是TaskDTO
的单声道
如果您不能回答我的问题,请至少告诉我如何并行执行多个 API 调用并在 WebClient
中等待结果正如您已经知道的那样,zipWith
不会帮助您,因为它会产生 min(a.size, b.size)
,它将始终为 1,以防其中之一是 Mono
.
但由于这两者是独立的,您可以简单地将它们拆分:
val task: Mono<TaskDTO> = client.getTask(id)
val result: Flux<ViewOfferDTO> =
task.flatMapMany {t ->
client.getTaskOffers(id).map {offer ->
t to offer
}
}.filter {
it.second.workerUser.id == user.id || it.first.creatorUser == user.id
}.map {
it.second
}
注意,如果你想要一对元素,你可以使用built-in Pair
。
此外,此检查没有多大意义,因为您只有 Mono
:it.first.creatorUser
这是一个并行调用的用例。
public Mono<UserInfo> fetchCarrierUserInfo(User user) {
Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());
return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
UserInfo userInfo = tuple.getT1();
userInfo.setCarrier(tuple.getT2());
return userInfo;
});
}
这里:
fetchUserInfo
进行 http 调用以从另一服务获取用户信息,并且 returnsMono
fetchCarrierInfo
方法进行 HTTP 调用以从另一个服务获取 carrierInfo 并且 returnsMono
Mono.zip()
将给定的 monos 合并到一个新的 Mono 中,当所有给定的 Monos 都产生了一个项目时,它将被实现,将它们的值聚合到一个 Tuple2 中。
然后调用fetchCarrierUserInfo().block()
得到最终结果
使用 repeat() 将您的 Mono 转换为 Flux:
client.getTask(id).cache().repeat();
所以你的代码会变成
return client.getTaskOffers(id).map {
it.toViewOfferDTO()
}.zipWith(client.getTask(id).cache().repeat(), BiFunction {
offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
}).filter {
it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
}.map {
it.offer
}