如何将 Mono 流转换为 Flux

How can I convert a Stream of Mono to Flux

我有一个尝试使用 WebClient 来 return Mono

的方法
    @GetMapping("getMatch")
    public Mono<Object> getMatch(@RequestParam Long matchId) {
        return WebClient.create(OpenDotaConstant.BASE_URL).get()
                .uri("/matches/{matchId}", matchId)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(Object.class);

    }

可以return达到我预期的结果。 然后我尝试创建另一种方法来支持 List as params

    @GetMapping("getMatches")
    public Flux<Object> getMatches(@RequestParam String matchesId) {
        List<Long> matchesList = JSON.parseArray(matchesId, Long.class);

        return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
    }

但是这次return一个奇怪的结果。

[
    {
        "scanAvailable": true
    },
    {
        "scanAvailable": true
    }
]

我是响应式编程的新手,结合 Stream 和 Mono,然后转换为 Flux 的正确方法是什么?

大概,你需要的是:

@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
    List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
    return Flux.fromStream(matchesList.stream())
               .flatMap(this::getMatch);
}

而不是:

@GetMapping("getMatches")
public Flux<Object> getMatches(@RequestParam String matchesId) {
    List<Long> matchesList = JSON.parseArray(matchesId, Long.class);
    return Flux.fromStream(matchesList.parallelStream().map(this::getMatch));
}

备注:

  • 基本上,您期望 getMatches 端点到 return Flux<Object>。然而,正如它所写的那样 - 它实际上是 returns Flux<Mono<Object>>,因此你会看到奇怪的输出。要获得 Flux<Object>,我建议首先创建匹配 ID 的 Flux<Long>,然后 flatMap 调用 getMatch 的结果(即 returns Mono<Object>), 最后给出 Flux<Object>.

  • 还有,没必要用parallelStream()。因为您已经在使用反应堆,所以一切都将在反应堆调度程序上同时执行。