使用 Reactor WebClient 发出非阻塞 HTTP 请求并反序列化对对象的响应的最佳方法是什么?
What is the best way to make a non-blocking HTTP request using Reactor WebClient and deserialize the response to an object?
我有像 Vert.x 这样的异步库的经验,但特别是 Reactor/WebFlux 的新手。我想在 Web 应用程序上公开一个端点,当它被点击时,转身并调用另一个 Web 服务,将响应解析为 Java 对象,然后访问对象中的字段并对它们执行某些操作。我正在使用 WebClient 进行 HTTP 调用,并使用 Jackson ObjectMapper 对其进行反序列化。我的代码看起来大致像这样(注意:RequestUtil.safeDeserialize
只是使用 Jackson 将字符串主体解析为一个对象和 returns Optional<Object>
这就是为什么我之后还有一个额外的 map
步骤):
public Mono<String> function(final String param) {
final String encodedRequestBody = RequestUtil.encodeRequestBody(param);
final Mono<Response> responseMono = webClient.post()
.uri("/endpoint")
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + basicAuthHeader)
.accept(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(encodedRequestBody), String.class))
.exchange()
.flatMap(clientResponseMono -> clientResponseMono.bodyToMono(String.class))
.map(RequestUtil::safeDeserialize)
.map(resp -> resp.orElseThrow(() -> new RuntimeException("Failed to deserialize Oscar response!")));
responseMono.subscribe(response -> {
// Pull out some of the fields from the `Response` type object and do something with them
});
return responseMono.map(Response::aStringField);
}
在针对遵循完全相同逻辑但通过阻塞进行 HTTP 调用的相同应用程序测试此代码后 Java11 HttpClient
class,我几乎看不到两者之间的差异——事实上,WebClient
实现比阻塞实现稍微 低 性能。
很明显,我在代码或我对这里发生的事情的心理模型的某个地方犯了错误,所以任何 help/advice 都非常感谢。谢谢!
编辑:根据@Toerktumlare 回复中的建议,我已将函数更新为以下内容:
public Mono<String> function(final String param) {
final Mono<String> encodedRequestBody = RequestUtil.encodeRequestBodyToMono(param);
final Mono<Response> responseMono = webClient.post()
.uri("/endpoint")
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + basicAuthHeader)
.accept(MediaType.APPLICATION_JSON)
.body(encodedRequestBody, String.class)
.retrieve()
.bodyToMono(Response.class);
return responseMono.flatMap(response -> {
final String field = response.field();
// Use `field` to do something that would produce a log message
logger.debug("Field is: {}", field);
return Mono.just(field);
});
}
当 运行 这段代码时,我没有看到任何日志记录。这让我觉得 HTTP 调用实际上并没有发生(或没有及时完成?),因为当我将 subscribe
与相同的 WebClient
代码一起使用时,我可以成功地从响应中打印出字段。我错过了什么?
Edit2:此函数用于响应端点(为简洁起见,省略了几行代码):
@Bean
public RouterFunction<ServerResponse> routerFunction(ResponseHandler handler) {
return RouterFunctions.route(RequestPredicates.GET("/my/endpoint")
.and(RequestPredicates.accept(MediaType.ALL)), handler::endpoint);
}
public Mono<ServerResponse> endpoint(ServerRequest request) {
// Pull out a comma-separated list from the request
final List<String> params = Arrays.asList(fieldFromRequest.split(","));
// For each param, call function(param), roll into list
List<Mono<String>> results = params.stream()
.map(nonBlockingClient::function)
.collect(Collectors.toList());
// Check to see if any of the requests failed
if (results.size() != params.size()) {
return ServerResponse.status(500).build();
}
logger.debug("Success");
return ServerResponse.ok().build();
}
您的问题很可能与 subscribe
.
的使用有关
A consumer
将 subscribe
变成 producer
。您的后端应用程序是 producer
,它使调用客户端成为 consumer
。这意味着,它通常应该是调用客户端 subscribing
而不是你。
你现在所做的基本上是consuming
你自己的production
。这在某种程度上是阻塞的。
一般来说,您永远不应该在 webflux 应用程序中订阅,除非您的应用程序调用 api 然后使用响应(例如将其保存在数据库中等)。发起呼叫的通常是 subscriber
.
我会重写最后一部分并删除 subscribe
:
return responseMono.flatMap(response -> {
final string = doSomething(response);
return OscarGuacResponse.aStringField(string);
});
我还看到 RequestUtil::safeDeserializez
你 return 一个 Optional<T>
我会改为 return 一个 Mono#empty
或 [=26] =] 作为 return 类型,以便能够使用 webflux 中可用的许多错误运算符,例如 switchIfEmpty
、onErrorContinue
、defaultIfEmpty
等。有一个 entire chapter 关于 reactor 文档中的错误处理。
也可以考虑在许多地方使用 flatMap
而不是 map
。要了解这两者之间的差异,您可以 .
此外,在稍后查看性能时,您应该了解,当您衡量 webflux 性能时,与非阻塞应用程序相比,您需要查看内存占用和线程数等内容。在速度方面您可能看不到任何性能提升,而是看到应用程序使用更少的线程,这反过来意味着更小的内存占用,这本身就是一个增益。
更新:
您在进行反应式编程时尝试编写常规 java 代码,但这将不起作用。为什么您的代码不起作用是因为您 .
我写的没有IDE,所以它可能无法编译,但你应该理解它。您总是需要在前一个上进行链接,并且在反应式编程中通常不需要 java 流等。
public Mono<ServerResponse> endpoint(ServerRequest request) {
final List<String> params = Arrays.asList(fieldFromRequest.split(","));
return Flux.fromIterable(params)
.flatMap(param -> nonBlockingClient.function(param))
.collectList()
.flatMap(list -> {
if (list.size() != params.size()) {
return ServerResponse.status(500).build();
}
return ServerResponse.ok().build();
})
}
这是基本的反应式编程,我 非常 建议您阅读 "getting started section" 反应堆文档,以便了解基础知识,因为如果您要编写代码常规 java 在反应性应用程序中,您将度过一段糟糕的时光。
我有像 Vert.x 这样的异步库的经验,但特别是 Reactor/WebFlux 的新手。我想在 Web 应用程序上公开一个端点,当它被点击时,转身并调用另一个 Web 服务,将响应解析为 Java 对象,然后访问对象中的字段并对它们执行某些操作。我正在使用 WebClient 进行 HTTP 调用,并使用 Jackson ObjectMapper 对其进行反序列化。我的代码看起来大致像这样(注意:RequestUtil.safeDeserialize
只是使用 Jackson 将字符串主体解析为一个对象和 returns Optional<Object>
这就是为什么我之后还有一个额外的 map
步骤):
public Mono<String> function(final String param) {
final String encodedRequestBody = RequestUtil.encodeRequestBody(param);
final Mono<Response> responseMono = webClient.post()
.uri("/endpoint")
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + basicAuthHeader)
.accept(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(encodedRequestBody), String.class))
.exchange()
.flatMap(clientResponseMono -> clientResponseMono.bodyToMono(String.class))
.map(RequestUtil::safeDeserialize)
.map(resp -> resp.orElseThrow(() -> new RuntimeException("Failed to deserialize Oscar response!")));
responseMono.subscribe(response -> {
// Pull out some of the fields from the `Response` type object and do something with them
});
return responseMono.map(Response::aStringField);
}
在针对遵循完全相同逻辑但通过阻塞进行 HTTP 调用的相同应用程序测试此代码后 Java11 HttpClient
class,我几乎看不到两者之间的差异——事实上,WebClient
实现比阻塞实现稍微 低 性能。
很明显,我在代码或我对这里发生的事情的心理模型的某个地方犯了错误,所以任何 help/advice 都非常感谢。谢谢!
编辑:根据@Toerktumlare 回复中的建议,我已将函数更新为以下内容:
public Mono<String> function(final String param) {
final Mono<String> encodedRequestBody = RequestUtil.encodeRequestBodyToMono(param);
final Mono<Response> responseMono = webClient.post()
.uri("/endpoint")
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + basicAuthHeader)
.accept(MediaType.APPLICATION_JSON)
.body(encodedRequestBody, String.class)
.retrieve()
.bodyToMono(Response.class);
return responseMono.flatMap(response -> {
final String field = response.field();
// Use `field` to do something that would produce a log message
logger.debug("Field is: {}", field);
return Mono.just(field);
});
}
当 运行 这段代码时,我没有看到任何日志记录。这让我觉得 HTTP 调用实际上并没有发生(或没有及时完成?),因为当我将 subscribe
与相同的 WebClient
代码一起使用时,我可以成功地从响应中打印出字段。我错过了什么?
Edit2:此函数用于响应端点(为简洁起见,省略了几行代码):
@Bean
public RouterFunction<ServerResponse> routerFunction(ResponseHandler handler) {
return RouterFunctions.route(RequestPredicates.GET("/my/endpoint")
.and(RequestPredicates.accept(MediaType.ALL)), handler::endpoint);
}
public Mono<ServerResponse> endpoint(ServerRequest request) {
// Pull out a comma-separated list from the request
final List<String> params = Arrays.asList(fieldFromRequest.split(","));
// For each param, call function(param), roll into list
List<Mono<String>> results = params.stream()
.map(nonBlockingClient::function)
.collect(Collectors.toList());
// Check to see if any of the requests failed
if (results.size() != params.size()) {
return ServerResponse.status(500).build();
}
logger.debug("Success");
return ServerResponse.ok().build();
}
您的问题很可能与 subscribe
.
A consumer
将 subscribe
变成 producer
。您的后端应用程序是 producer
,它使调用客户端成为 consumer
。这意味着,它通常应该是调用客户端 subscribing
而不是你。
你现在所做的基本上是consuming
你自己的production
。这在某种程度上是阻塞的。
一般来说,您永远不应该在 webflux 应用程序中订阅,除非您的应用程序调用 api 然后使用响应(例如将其保存在数据库中等)。发起呼叫的通常是 subscriber
.
我会重写最后一部分并删除 subscribe
:
return responseMono.flatMap(response -> {
final string = doSomething(response);
return OscarGuacResponse.aStringField(string);
});
我还看到 RequestUtil::safeDeserializez
你 return 一个 Optional<T>
我会改为 return 一个 Mono#empty
或 [=26] =] 作为 return 类型,以便能够使用 webflux 中可用的许多错误运算符,例如 switchIfEmpty
、onErrorContinue
、defaultIfEmpty
等。有一个 entire chapter 关于 reactor 文档中的错误处理。
也可以考虑在许多地方使用 flatMap
而不是 map
。要了解这两者之间的差异,您可以
此外,在稍后查看性能时,您应该了解,当您衡量 webflux 性能时,与非阻塞应用程序相比,您需要查看内存占用和线程数等内容。在速度方面您可能看不到任何性能提升,而是看到应用程序使用更少的线程,这反过来意味着更小的内存占用,这本身就是一个增益。
更新:
您在进行反应式编程时尝试编写常规 java 代码,但这将不起作用。为什么您的代码不起作用是因为您
我写的没有IDE,所以它可能无法编译,但你应该理解它。您总是需要在前一个上进行链接,并且在反应式编程中通常不需要 java 流等。
public Mono<ServerResponse> endpoint(ServerRequest request) {
final List<String> params = Arrays.asList(fieldFromRequest.split(","));
return Flux.fromIterable(params)
.flatMap(param -> nonBlockingClient.function(param))
.collectList()
.flatMap(list -> {
if (list.size() != params.size()) {
return ServerResponse.status(500).build();
}
return ServerResponse.ok().build();
})
}
这是基本的反应式编程,我 非常 建议您阅读 "getting started section" 反应堆文档,以便了解基础知识,因为如果您要编写代码常规 java 在反应性应用程序中,您将度过一段糟糕的时光。