Spring 反应式 webClient - 如何在 Mono 上调用方法
Spring reactive webClient - how to call methods on a Mono
响应式编程的新手并尝试通过 WebFlux 和 WebClient 创建响应式服务。
方法的流程是这样的
- POST 请求并等待响应
- 对映射的响应正文
服务(它有一些其他的业务逻辑)并且 returns
推荐类型
- 创建响应实体
- 创建 Mono
类型的 Mono
问题是这样做是否有效,因为我应该使用 .exchange() 吗?有没有一种方法可以链接这些方法而不是单独的方法
当前实施:
private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {
Mono<String> response = webClient.build()
.post()
.uri(uri)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(requestBody)
.retrieve().bodyToMono(String.class);
var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);
return Mono.just(entity);
}
简短的回答可能是否定的。
更长的版本是,当有人开始订阅您的服务(即生产者)时,客户想要消费数据。一旦订阅开始,webflux 将做的就是在应用程序内部构建反应链。该链可以与一种回调链进行比较,称为“组装阶段”。
在此组装阶段,重要的是每个 Flux/Mono 的 return 相互链接。否则你就是在打破链条。
var firstAction = Mono.just("Hello").flatMap(value -> {
// do something
});
// Next action needs to chain on the last
var secondAction = firstAction.flatMap(value -> {
// Do the next thing
});
// Can be combined
var bothActions = Mono.just("Hello").flatMap(value -> {
// do something
}).flatMap(value -> {
// do next thing
});
在上面的例子中你可以看到我一直在链接最后一个动作,我们并没有打破链条。
现在开始你的代码。
private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {
// Here you have a response
Mono<String> response = webClient.build()
.post()
.uri(uri)
// Not needed content type will default to json
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
// will also default to json
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(requestBody)
.retrieve().bodyToMono(String.class);
// here you pass the response into a function, hence probably breaking the chain
var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);
// Here you are suddenly creating a new mono, which tells me you deffo broke the chain and need to recreate it by doing a mono#just
return Mono.just(entity);
}
那我们怎么解决呢?
private Mono<Recommendations> myMethod(final Request request, final String variantName) {
// You should not build a webclient on each request, you should build it in a @Bean
final Mono<XYZResponse> response = webClient.build()
.post()
.uri(uri)
.bodyValue(requestBody)
.retrieve()
// Map into a class representation to uphold type safety
.bodyToMono(XYZResponse.class);
// if we need to do additional transformations we can flatMap and chain on
final Mono<Recommendations> recommendations = response.flatMap(value -> {
var recommendations = mapper.toRecommendations(value);
});
// No need to wrap it in a response webflux will do that for you automatically when you return it to the client
return recommendations;
}
我们甚至可以重写它更短。
private Mono<Recommendations> myMethod(final Request request, final String variantName) {
return webClient.build()
.post()
.uri(uri)
.bodyValue(requestBody)
.retrieve()
.bodyToMono(XYZResponse.class)
.flatMap(value -> {
var recommendations = mapper.toRecommendations(value);
});
}
我通常在第一行写 return 语句,然后再写我的链。
如果你想进一步优化它(通常推荐),你可以在配置 bean 中创建一个 webclient,这样 webclient 只创建一次(在 webflux 服务器启动时),然后我们在每个请求。
@Configuration
public class ClientConfig {
@Bean
public WebClient webclient(WebClient.Builder webClient) {
return webClient.baseUrl( ... )
.build();
}
}
@Component
public class RecommendationHandler {
final private WebClient
@Autowire
public RecommendationHandler(WebClient webClient) {
this.webClient = webClient;
}
private Mono<Recommendations> getRecommendations(RequestBody requestBody) {
return webClient
.post()
.bodyValue(requestBody)
.retrieve()
.bodyToMono(XYZResponse.class)
.flatMap(value -> {
var recommendations = mapper.toRecommendations(value);
});
}
}
像这样。这只是一个例子,我没有 运行 它在 IDE 中,我只是从头顶写下来的。但这就是我想到的一些事情,这里没有错误处理,应该是地址。
祝你好运
经过大量阅读和试验后,我设法使其适用于以下内容:
return webClient.build()
.post()
.uri(uri)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(requestBody)
.retrieve()
.toEntity(String.class)
.publishOn(Schedulers.boundedElastic())
.map(x -> {
var recs = processResponse(request, x.getBody(), useCaseId, variantName);
return new ResponseEntity<GatewayRecommendations>(recs, x.getStatusCode());
});
响应式编程的新手并尝试通过 WebFlux 和 WebClient 创建响应式服务。
方法的流程是这样的
- POST 请求并等待响应
- 对映射的响应正文 服务(它有一些其他的业务逻辑)并且 returns 推荐类型
- 创建响应实体
- 创建 Mono
类型的 Mono
问题是这样做是否有效,因为我应该使用 .exchange() 吗?有没有一种方法可以链接这些方法而不是单独的方法
当前实施:
private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {
Mono<String> response = webClient.build()
.post()
.uri(uri)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(requestBody)
.retrieve().bodyToMono(String.class);
var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);
return Mono.just(entity);
}
简短的回答可能是否定的。
更长的版本是,当有人开始订阅您的服务(即生产者)时,客户想要消费数据。一旦订阅开始,webflux 将做的就是在应用程序内部构建反应链。该链可以与一种回调链进行比较,称为“组装阶段”。
在此组装阶段,重要的是每个 Flux/Mono 的 return 相互链接。否则你就是在打破链条。
var firstAction = Mono.just("Hello").flatMap(value -> {
// do something
});
// Next action needs to chain on the last
var secondAction = firstAction.flatMap(value -> {
// Do the next thing
});
// Can be combined
var bothActions = Mono.just("Hello").flatMap(value -> {
// do something
}).flatMap(value -> {
// do next thing
});
在上面的例子中你可以看到我一直在链接最后一个动作,我们并没有打破链条。
现在开始你的代码。
private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {
// Here you have a response
Mono<String> response = webClient.build()
.post()
.uri(uri)
// Not needed content type will default to json
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
// will also default to json
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(requestBody)
.retrieve().bodyToMono(String.class);
// here you pass the response into a function, hence probably breaking the chain
var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);
// Here you are suddenly creating a new mono, which tells me you deffo broke the chain and need to recreate it by doing a mono#just
return Mono.just(entity);
}
那我们怎么解决呢?
private Mono<Recommendations> myMethod(final Request request, final String variantName) {
// You should not build a webclient on each request, you should build it in a @Bean
final Mono<XYZResponse> response = webClient.build()
.post()
.uri(uri)
.bodyValue(requestBody)
.retrieve()
// Map into a class representation to uphold type safety
.bodyToMono(XYZResponse.class);
// if we need to do additional transformations we can flatMap and chain on
final Mono<Recommendations> recommendations = response.flatMap(value -> {
var recommendations = mapper.toRecommendations(value);
});
// No need to wrap it in a response webflux will do that for you automatically when you return it to the client
return recommendations;
}
我们甚至可以重写它更短。
private Mono<Recommendations> myMethod(final Request request, final String variantName) {
return webClient.build()
.post()
.uri(uri)
.bodyValue(requestBody)
.retrieve()
.bodyToMono(XYZResponse.class)
.flatMap(value -> {
var recommendations = mapper.toRecommendations(value);
});
}
我通常在第一行写 return 语句,然后再写我的链。
如果你想进一步优化它(通常推荐),你可以在配置 bean 中创建一个 webclient,这样 webclient 只创建一次(在 webflux 服务器启动时),然后我们在每个请求。
@Configuration
public class ClientConfig {
@Bean
public WebClient webclient(WebClient.Builder webClient) {
return webClient.baseUrl( ... )
.build();
}
}
@Component
public class RecommendationHandler {
final private WebClient
@Autowire
public RecommendationHandler(WebClient webClient) {
this.webClient = webClient;
}
private Mono<Recommendations> getRecommendations(RequestBody requestBody) {
return webClient
.post()
.bodyValue(requestBody)
.retrieve()
.bodyToMono(XYZResponse.class)
.flatMap(value -> {
var recommendations = mapper.toRecommendations(value);
});
}
}
像这样。这只是一个例子,我没有 运行 它在 IDE 中,我只是从头顶写下来的。但这就是我想到的一些事情,这里没有错误处理,应该是地址。
祝你好运
经过大量阅读和试验后,我设法使其适用于以下内容:
return webClient.build()
.post()
.uri(uri)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(requestBody)
.retrieve()
.toEntity(String.class)
.publishOn(Schedulers.boundedElastic())
.map(x -> {
var recs = processResponse(request, x.getBody(), useCaseId, variantName);
return new ResponseEntity<GatewayRecommendations>(recs, x.getStatusCode());
});