如何使用Spring WebClient 同时发起多个调用并分别得到响应?

How to use Spring WebClient to make multiple calls simultaneously and get response separately?

我必须同时执行多个彼此独立的 API 调用:

Mono<Response1> response1= this.webClient
               .post()
               .uri(requestURI1)
               .body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
               .exchangeToMono(response -> {
                   return response.statusCode().equals(HttpStatus.OK)
                            ? response.bodyToMono(ParameterizedTypeReference<T>)
                            : response.createException().flatMap(Mono::error);
                });

Mono<Response2> response2= this.webClient
               .post()
               .uri(requestURI2)
               .body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
               .exchangeToMono(response -> {
                   return response.statusCode().equals(HttpStatus.OK)
                            ? response.bodyToMono(ParameterizedTypeReference<T>)
                            : response.createException().flatMap(Mono::error);
                });

Mono<Response3> response3= this.webClient
               .post()
               .uri(requestURI3)
               .body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
               .exchangeToMono(response -> {
                   return response.statusCode().equals(HttpStatus.OK)
                            ? response.bodyToMono(ParameterizedTypeReference<T>)
                            : response.createException().flatMap(Mono::error);
                });

如何在单独的对象中获得上述 api 调用的响应,同时它们应该并行执行?此外,在执行上述这些调用并将数据放入单独的对象(例如 Response1、Response2、Response3)之后,我想执行另一个 API 调用,该调用使用这些响应 Response1、Response2、Response3。

我曾尝试使用 Flux.merge,但这会将响应合并到单个对象中,这是不正确的。另请参阅 Mono.zip,但这些都用于组合我不想要的响应。

编辑: Mono.zip 完美运行。 我有一个跟进问题,我在评论中提到过,但也张贴在这里。 所以我是这样实现的:

Mono.zip(rs1, rs2, rs3).flatMap(tuples -> {
//do something with responses
return Mono.just(transformedData)
}).flatMap(transformedData -> {
//here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>

Mono<actualData> data= callAPI;
return data;
});

Now this response is propagated to rest layer in form of Mono<actualData> and I am getting this in response: {
    "scanAvailable": true
}

要并行合并发布者,您可以使用 Mono.zip,它将 return TupleX<...> 并在解析所有发布者时解析。

Mono<Tuple3<Response1, Response2, Response3>> res = Mono.zip(response1, response2, response3)
        .map(tuples -> {
            //do something with responses
            return transformedData;
        })
        .flatMap(transformedData -> {
            //here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>
            return callAPI(transformedData);
        });

根据错误处理逻辑,您可以考虑 zipDelayError

正如我在评论中提到的,响应式 API 的关键之一是区分同步和异步操作。在同步转换的情况下,将其与 .map 链接并使用 .flatMap 或类似的运算符,以防您想链接另一个异步方法。

阅读 ExecutorService. This is a thread pool which you can use to submit tasks to be executed in separate threads in paralel. You will need to implement the Callable interface and get Future 作为你的结果。这为您提供了所需的功能