Spring WebFlux 5.3.0 - WebClient.exchangeToMono()

Spring WebFlux 5.3.0 - WebClient.exchangeToMono()

我刚刚升级到 Webflux 5.3.0,并注意到 WebClient.exchange() 方法现在已弃用 (link),取而代之的是新方法 .exchangeToMono() 和 .exchangeToFlux( )

我有这个代码:

webClient
   .method(request.method)
   .uri(request.path)
   .body(request.bodyToMono<ByteArray>())
   .exchange()
   .flatMap { response ->
      ServerResponse.
         .status(response.statusCode())
         .headers { it.addAll(response.headers().asHttpHeaders()) }
         .body(response.bodyToMono<ByteArray>())
   }

我不得不将其重构为:

   .exchangeToMono { response ->
      ServerResponse.
         .status(response.statusCode())
         .headers { it.addAll(response.headers().asHttpHeaders()) }
         .body(response.bodyToMono<ByteArray>())
   }

然而,显然 .exchangeToMono() 调用 .releaseIfNotConsumed(),释放未处理的响应主体,并且基本上使服务器 return 成为一个空主体

所以我不得不进一步重构我的代码:

   .exchangeToMono { response ->
      response.bodyToMono<ByteArray>()
         .defaultIfEmpty(ByteArray(0))
         .flatMap { body ->
            ServerResponse.
               .status(response.statusCode())
               .headers { it.addAll(response.headers().asHttpHeaders()) }
               .bodyValue(body)
         }
   }

据我了解,.exchange() 允许我的代理服务器传输响应主体而不实际处理它,而 .exchangeToMono() 强制我处理(缓冲?)它。这是正确的吗?

如果是这样,有什么影响?我应该接受更改,还是应该以某种方式调整代码以使其在不处理响应主体的情况下传输响应主体?我该怎么做?

==========

tl;dr 通过 .body(response.bodyToMono()).bodyValue(body) 之间的实际区别是什么?

通读更改并尝试理解您的问题后,我将尝试回答这个问题。我不能以任何方式确定这是正确的答案,我将根据我对 reactor、webflux 和 webclient 的了解做出一些逻辑假设。

自从 WebClient 发布以来,主要的主力应该是 retrieve() 能够针对完全异步的 webclient 提供简单但稳定的 API。

问题是大多数人习惯于使用旧的已弃用 RestTemplate 返回的 ResponseEntities,所以人们转而使用 exchange() 函数。

但问题就出在这里。当您获得 Response 的访问权限时,您也有责任附加到它。您有义务使用 response 以便服务器可以关闭 TCP 连接。这通常意味着您需要读取 header 和 body 然后我们可以关闭连接。

如果您不使用响应,您将有一个打开的连接,从而导致内存泄漏。

Spring 通过提供像 response#bodyToMonoresponse#bodyToFlux 这样的函数来解决这个问题,它们消耗 body 然后关闭响应(这反过来关闭连接,因此使用响应)。

但事实证明,人们很容易(因为开发人员都是狡猾的混蛋)编写不消耗响应的代码,从而产生悬空的 TCP 连接。

webclient.url( ... )
    .exchange(response -> {

        // This is just an example but, but here i just re-return the response
        // which means that the server will keep the connection open, until i 
        // actually consume the body. I could send this all over my application
        // but never consume it and the server will keep the connection open as
        // long as i do, could be a potential memory leak.

        return Mono.just(response)
    }

新的 exchangeToMono 实现基本上会强制您使用 body 来避免内存泄漏。如果您想处理原始响应,您将被迫使用 body.

So lats 谈谈你的例子和你的需求。

您只想基本上将请求从一台服务器代理到另一台服务器。您确实使用了 body,您只是没有在靠近 WebClient 的 flatMap 中使用它。

.exchange()
   .flatMap { response ->
      ServerResponse.
         .status(response.statusCode())
         .headers { it.addAll(response.headers().asHttpHeaders()) }
         .body(response.bodyToMono<ByteArray>()) 
         // Here you are declaring you want to consume but it isn't consumed right here, its not consumed until much later.
   }

在您的代码中,您将返回一个 ServerResponse,但您必须始终考虑。 在您订阅之前什么都不会发生。你基本上传递了一个 long a ServerResponse 但你还没有消耗 body 呢。您只声明了当服务器需要 body 时,它将不得不消耗最后一个响应的 body 以获得新的 body.

这样想,您返回的 ServerResponse 只包含关于我们想要的内容的声明,而不是其中实际包含的内容。

当它从 flatMap 返回时,它将一直传出应用程序,直到我们将其写为对我们针对客户端的开放 TCP 连接的响应。

只有在那之后才会构建响应,那时将使用和关闭来自 WebClient 的第一个响应。

所以您的原始代码确实有效,因为您确实使用了 WebClient 响应,只是在您向调用客户端编写响应之前才这样做。

您所做的事情本身并没有错,只是以这种方式使用 WebClient API 会增加人们错误使用它的风险,并且可能会发生内存泄漏。

我希望这至少能回答您的一些问题,我主要是写下我对更改的解释。

作为向后兼容的安全解决方法:

exchangeToMono(rs -> Mono.just(rs.mutate().build)))

您仍然收到 ClientResponse 类型的下游,其中在 mutate() 期间在原始 ClientResponse 上消耗/释放了所有数据。

别忘了释放变异的一个。

更新:

这不是正确的方法,因为它效率不高,但如果您需要在庞大的遗留反应管道中进行临时测试 - 它可以工作。

发现这种方式可以与 memory-efficient DataBuffers 一起使用。 onStatusonRawStatus 对于不成功的状态至关重要,因为它取代了默认的错误处理程序。

   .retrieve()
   // ignore all statuses so no error is thrown when status is > 400 in toEntityFlux()
   .onRawStatus(status -> true, response -> Mono.empty())
   .toEntityFlux(DataBuffer.class)
   .flatMap(entity -> ServerResponse
       .status(entity.getStatusCode())
       .body(entity.getBody(), DataBuffer.class))