使用 Reactor WebClient 发出非阻塞 HTTP 请求并反序列化对对象的响应的最佳方法是什么?

What is the best way to make a non-blocking HTTP request using Reactor WebClient and deserialize the response to an object?

我有像 Vert.x 这样的异步库的经验,但特别是 Reactor/WebFlux 的新手。我想在 Web 应用程序上公开一个端点,当它被点击时,转身并调用另一个 Web 服务,将响应解析为 Java 对象,然后访问对象中的字段并对它们执行某些操作。我正在使用 WebClient 进行 HTTP 调用,并使用 Jackson ObjectMapper 对其进行反序列化。我的代码看起来大致像这样(注意:RequestUtil.safeDeserialize 只是使用 Jackson 将字符串主体解析为一个对象和 returns Optional<Object> 这就是为什么我之后还有一个额外的 map 步骤):

    public Mono<String> function(final String param) {
        final String encodedRequestBody = RequestUtil.encodeRequestBody(param);
        final Mono<Response> responseMono = webClient.post()
                .uri("/endpoint")
                .header("Content-Type", "application/x-www-form-urlencoded")
                .header("Authorization", "Basic " + basicAuthHeader)
                .accept(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromPublisher(Mono.just(encodedRequestBody), String.class))
                .exchange()
                .flatMap(clientResponseMono -> clientResponseMono.bodyToMono(String.class))
                .map(RequestUtil::safeDeserialize)
                .map(resp -> resp.orElseThrow(() -> new RuntimeException("Failed to deserialize Oscar response!")));

        responseMono.subscribe(response -> {
            // Pull out some of the fields from the `Response` type object and do something with them
        });

        return responseMono.map(Response::aStringField);
    }

在针对遵循完全相同逻辑但通过阻塞进行 HTTP 调用的相同应用程序测试此代码后 Java11 HttpClient class,我几乎看不到两者之间的差异——事实上,WebClient 实现比阻塞实现稍微 性能。

很明显,我在代码或我对这里发生的事情的心理模型的某个地方犯了错误,所以任何 help/advice 都非常感谢。谢谢!

编辑:根据@Toerktumlare 回复中的建议,我已将函数更新为以下内容:

    public Mono<String> function(final String param) {
        final Mono<String> encodedRequestBody = RequestUtil.encodeRequestBodyToMono(param);
        final Mono<Response> responseMono = webClient.post()
                .uri("/endpoint")
                .header("Content-Type", "application/x-www-form-urlencoded")
                .header("Authorization", "Basic " + basicAuthHeader)
                .accept(MediaType.APPLICATION_JSON)
                .body(encodedRequestBody, String.class)
                .retrieve()
                .bodyToMono(Response.class);

        return responseMono.flatMap(response -> {
            final String field = response.field();
            // Use `field` to do something that would produce a log message
            logger.debug("Field is: {}", field);
            return Mono.just(field);
        });
}

当 运行 这段代码时,我没有看到任何日志记录。这让我觉得 HTTP 调用实际上并没有发生(或没有及时完成?),因为当我将 subscribe 与相同的 WebClient 代码一起使用时,我可以成功地从响应中打印出字段。我错过了什么?

Edit2:此函数用于响应端点(为简洁起见,省略了几行代码):

    @Bean
    public RouterFunction<ServerResponse> routerFunction(ResponseHandler handler) {
        return RouterFunctions.route(RequestPredicates.GET("/my/endpoint")
                .and(RequestPredicates.accept(MediaType.ALL)), handler::endpoint);
    }
 

    public Mono<ServerResponse> endpoint(ServerRequest request) {
        // Pull out a comma-separated list from the request
        final List<String> params = Arrays.asList(fieldFromRequest.split(","));

        // For each param, call function(param), roll into list
        List<Mono<String>> results = params.stream()
                .map(nonBlockingClient::function)
                .collect(Collectors.toList());

        // Check to see if any of the requests failed
        if (results.size() != params.size()) {
            return ServerResponse.status(500).build();
        }

        logger.debug("Success");
        return ServerResponse.ok().build();
    }

您的问题很可能与 subscribe.

的使用有关

A consumersubscribe 变成 producer。您的后端应用程序是 producer,它使调用客户端成为 consumer。这意味着,它通常应该是调用客户端 subscribing 而不是你。

你现在所做的基本上是consuming你自己的production。这在某种程度上是阻塞的。

一般来说,您永远不应该在 webflux 应用程序中订阅,除非您的应用程序调用 api 然后使用响应(例如将其保存在数据库中等)。发起呼叫的通常是 subscriber.

我会重写最后一部分并删除 subscribe:

return responseMono.flatMap(response -> {
        final string = doSomething(response);
        return OscarGuacResponse.aStringField(string);
    });

我还看到 RequestUtil::safeDeserializez 你 return 一个 Optional<T> 我会改为 return 一个 Mono#empty 或 [=26] =] 作为 return 类型,以便能够使用 webflux 中可用的许多错误运算符,例如 switchIfEmptyonErrorContinuedefaultIfEmpty 等。有一个 entire chapter 关于 reactor 文档中的错误处理。

也可以考虑在许多地方使用 flatMap 而不是 map。要了解这两者之间的差异,您可以 .

此外,在稍后查看性能时,您应该了解,当您衡量 webflux 性能时,与非阻塞应用程序相比,您需要查看内存占用和线程数等内容。在速度方面您可能看不到任何性能提升,而是看到应用程序使用更少的线程,这反过来意味着更小的内存占用,这本身就是一个增益。

更新:

您在进行反应式编程时尝试编写常规 java 代码,但这将不起作用。为什么您的代码不起作用是因为您 .

我写的没有IDE,所以它可能无法编译,但你应该理解它。您总是需要在前一个上进行链接,并且在反应式编程中通常不需要 java 流等。

public Mono<ServerResponse> endpoint(ServerRequest request) {
    final List<String> params = Arrays.asList(fieldFromRequest.split(","));
    return Flux.fromIterable(params)
               .flatMap(param -> nonBlockingClient.function(param))
               .collectList()
               .flatMap(list -> {
                   if (list.size() != params.size()) {
                       return ServerResponse.status(500).build();
                   }
                   return ServerResponse.ok().build();
               })
}

这是基本的反应式编程,我 非常 建议您阅读 "getting started section" 反应堆文档,以便了解基础知识,因为如果您要编写代码常规 java 在反应性应用程序中,您将度过一段糟糕的时光。