使用 Spring Webclient 分散和聚集

Scatter & Gather using Spring Webclient

我不熟悉反应式编程概念,并尝试构建一种服务,将请求并行发送到两个后端服务,并将这些结果结合起来。 这两个后端服务具有不同的响应结构,我创建了一个映射器方法来将所有这些转换为通用的响应结构。

这就是我现在拥有的,当这两个服务 return 都产生结果时它正在工作。

public Mono<List<Response>> getRecords(String input){

List<Response> response = new ArrayList<>();

Mono<FirstApiResponse> gResp = this.firstWebClient.get().uri(uriBuilder -> uriBuilder
            .path("/")
            .queryParam("q", input)
            .build()).retrieve()
            .bodyToMono(FirstApiResponse.class).log()
            .timeout(Duration.ofSeconds(50L));

Mono<SecondApiResponse> iResp = this.secondWebClient.get().uri(uriBuilder -> uriBuilder
        .path("/search")
        .queryParam("term", input)
        .build()).retrieve()
        .bodyToMono(SecondApiResponse.class).log().timeout(Duration.ofSeconds(50L));


return Mono.zip(firstResp,secResp).map(objects ->{
    if(firstResp != null)
    response.addAll(Mapper.convert(objects.getT1()));
    if(secResp != null);
    response.addAll(Mapper.convert(objects.getT2()));
    return response;
});

}

public  List<Response> convert(FirstApiResponse resp){
    ////
    Mapping to Response object 
    ////

    return response;
}

public  List<Response> convert(SecondApiResponse resp){
     ////
    Mapping to Response object 
    ////

    return response;
}

我不知道这样做是否正确。此外,我想以这样一种方式进行,即如果该服务中的任何一个有任何错误,那么它仍然应该 return 来自其他服务的结果。现在它抛出异常,我不知道如何正确处理它

如何正确处理这些错误?

这是一个非常有效的场景,有很多方法可以处理它。一种粗略的方法是使用 onErrorReturn 一个您可以处理的新模型。它可以是空响应,也可以是模型的包装器,以适合您的场景为准。

Mono<Wrapper<FirstApiResponse>> gResp = this.firstWebClient.get().uri(uriBuilder -> uriBuilder
     .path("/")
     .queryParam("q", input)
     .build()).retrieve()
     .bodyToMono(FirstApiResponse.class).log()
     .map( response -> new Wrapper().withResponse(response))
     .timeout(Duration.ofSeconds(50L))
     .doOnError(throwable -> logger.error("Failed", throwable))
      .onErrorReturn(new Wrapper().withError( YourDefaultErrorReponse(...));

Mono<SecondApiResponse> iResp = this.secondWebClient.get().uri(uriBuilder -> uriBuilder
    .path("/search")
    .queryParam("term", input)
    .build())
    .retrieve()      
    .bodyToMono(SecondApiResponse.class).log()
    .map( response -> new Wrapper().withResponse(response))
    .timeout(Duration.ofSeconds(50L))
    ..doOnError(throwable -> logger.error("Failed", throwable))
    .onErrorReturn(new Wrapper().withError( YourDefaultErrorReponse(...))

同样,有一些方法可以 return 默认响应。一个简单的方法是使用包装器之类的东西

public final class Wrapper<T> {
  private T response ;
  private Error error;
      
  public Wrapper<T> withResponse ( T response ){
     this.response = response;
     return this;
  }
  public Wrapper<T> withError( Error error) {
     this.error = error;
     return this;
  }

  public Boolean hasError(){
    return error != null ;
  }
      
  public T getResponse(){
   return response;
  }
}