如何将 Mono 流转换为 Flux
How can I convert a Stream of Mono to Flux
我有一个尝试使用 WebClient 来 return Mono
的方法
@GetMapping("getMatch")
public Mono<Object> getMatch(@RequestParam Long matchId) {
return WebClient.create(OpenDotaConstant.BASE_URL).get()
.uri("/matches/{matchId}", matchId)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Object.class);
}
可以return达到我预期的结果。
然后我尝试创建另一种方法来支持 List as params
@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
}
但是这次return一个奇怪的结果。
[
{
"scanAvailable": true
},
{
"scanAvailable": true
}
]
我是响应式编程的新手,结合 Stream 和 Mono,然后转换为 Flux 的正确方法是什么?
大概,你需要的是:
@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
return Flux.fromStream(matchesList.stream())
.flatMap(this::getMatch);
}
而不是:
@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
}
备注:
基本上,您期望 getMatches
端点到 return Flux<Object>
。然而,正如它所写的那样 - 它实际上是 returns Flux<Mono<Object>>
,因此你会看到奇怪的输出。要获得 Flux<Object>
,我建议首先创建匹配 ID 的 Flux<Long>
,然后 flatMap
调用 getMatch
的结果(即 returns Mono<Object>
), 最后给出 Flux<Object>
.
还有,没必要用parallelStream()
。因为您已经在使用反应堆,所以一切都将在反应堆调度程序上同时执行。
我有一个尝试使用 WebClient 来 return Mono
的方法 @GetMapping("getMatch")
public Mono<Object> getMatch(@RequestParam Long matchId) {
return WebClient.create(OpenDotaConstant.BASE_URL).get()
.uri("/matches/{matchId}", matchId)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Object.class);
}
可以return达到我预期的结果。 然后我尝试创建另一种方法来支持 List as params
@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
}
但是这次return一个奇怪的结果。
[
{
"scanAvailable": true
},
{
"scanAvailable": true
}
]
我是响应式编程的新手,结合 Stream 和 Mono,然后转换为 Flux 的正确方法是什么?
大概,你需要的是:
@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
return Flux.fromStream(matchesList.stream())
.flatMap(this::getMatch);
}
而不是:
@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
}
备注:
基本上,您期望
getMatches
端点到 returnFlux<Object>
。然而,正如它所写的那样 - 它实际上是 returnsFlux<Mono<Object>>
,因此你会看到奇怪的输出。要获得Flux<Object>
,我建议首先创建匹配 ID 的Flux<Long>
,然后flatMap
调用getMatch
的结果(即 returnsMono<Object>
), 最后给出Flux<Object>
.还有,没必要用
parallelStream()
。因为您已经在使用反应堆,所以一切都将在反应堆调度程序上同时执行。