Spring 反应式 webClient - 如何在 Mono 上调用方法

Spring reactive webClient - how to call methods on a Mono

响应式编程的新手并尝试通过 WebFlux 和 WebClient 创建响应式服务。

方法的流程是这样的

  1. POST 请求并等待响应
  2. 对映射的响应正文 服务(它有一些其他的业务逻辑)并且 returns 推荐类型
  3. 创建响应实体
  4. 创建 Mono 类型的 Mono

问题是这样做是否有效,因为我应该使用 .exchange() 吗?有没有一种方法可以链接这些方法而不是单独的方法

当前实施:

private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {

    Mono<String> response = webClient.build()
            .post()
            .uri(uri)
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve().bodyToMono(String.class);

    var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
    var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);
    return Mono.just(entity);

}

简短的回答可能是否定的。

更长的版本是,当有人开始订阅您的服务(即生产者)时,客户想要消费数据。一旦订阅开始,webflux 将做的就是在应用程序内部构建反应链。该链可以与一种回调链进行比较,称为“组装阶段”。

在此组装阶段,重要的是每个 Flux/Mono 的 return 相互链接。否则你就是在打破链条。

var firstAction = Mono.just("Hello").flatMap(value -> {
    // do something
});

// Next action needs to chain on the last
var secondAction = firstAction.flatMap(value -> {
    // Do the next thing
});

// Can be combined
var bothActions = Mono.just("Hello").flatMap(value -> {
    // do something
}).flatMap(value -> {
    // do next thing
});

在上面的例子中你可以看到我一直在链接最后一个动作,我们并没有打破链条。

现在开始你的代码。

private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {

    // Here you have a response
    Mono<String> response = webClient.build()
            .post()
            .uri(uri)
            // Not needed content type will default to json
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            // will also default to json
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve().bodyToMono(String.class);

    // here you pass the response into a function, hence probably breaking the chain
    var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
    var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);

    // Here you are suddenly creating a new mono, which tells me you deffo broke the chain and need to recreate it by doing a mono#just
    return Mono.just(entity);

}

那我们怎么解决呢?

private Mono<Recommendations> myMethod(final Request request, final String variantName) {
    // You should not build a webclient on each request, you should build it in a @Bean
    final Mono<XYZResponse> response = webClient.build()
            .post()
            .uri(uri)
            .bodyValue(requestBody)
            .retrieve()
            // Map into a class representation to uphold type safety
            .bodyToMono(XYZResponse.class);

    // if we need to do additional transformations we can flatMap and chain on
    final Mono<Recommendations> recommendations = response.flatMap(value -> {
        var recommendations = mapper.toRecommendations(value);
    });

    // No need to wrap it in a response webflux will do that for you automatically when you return it to the client
    return recommendations;
}

我们甚至可以重写它更短。

private Mono<Recommendations> myMethod(final Request request, final String variantName) {
    return webClient.build()
            .post()
            .uri(uri)
            .bodyValue(requestBody)
            .retrieve()
            .bodyToMono(XYZResponse.class)
            .flatMap(value -> {
                var recommendations = mapper.toRecommendations(value);
            });
}

我通常在第一行写 return 语句,然后再写我的链。

如果你想进一步优化它(通常推荐),你可以在配置 bean 中创建一个 webclient,这样 webclient 只创建一次(在 webflux 服务器启动时),然后我们在每个请求。

@Configuration
public class ClientConfig {

    @Bean
    public WebClient webclient(WebClient.Builder webClient) {
        return webClient.baseUrl( ... )
                .build();
    }
}

@Component
public class RecommendationHandler {

    final private WebClient 

    @Autowire
    public RecommendationHandler(WebClient webClient) {
        this.webClient = webClient;
    }

    private Mono<Recommendations> getRecommendations(RequestBody requestBody) {
    return webClient
            .post()
            .bodyValue(requestBody)
            .retrieve()
            .bodyToMono(XYZResponse.class)
            .flatMap(value -> {
                var recommendations = mapper.toRecommendations(value);
            });
    }   
}

像这样。这只是一个例子,我没有 运行 它在 IDE 中,我只是从头顶写下来的。但这就是我想到的一些事情,这里没有错误处理,应该是地址。

祝你好运

经过大量阅读和试验后,我设法使其适用于以下内容:

    return webClient.build()
            .post()
            .uri(uri)
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve()
            .toEntity(String.class)
            .publishOn(Schedulers.boundedElastic())
            .map(x -> {
           
                var recs = processResponse(request, x.getBody(), useCaseId, variantName);
                return new ResponseEntity<GatewayRecommendations>(recs, x.getStatusCode());
            });