如何正确地从 WebFlux Client 中排出/释放响应主体?

How to properly drain / release response body from WebFlux Client?

我正在使用来自 Spring 5 反应堆栈的 WebFlux HTTP 客户端来访问外部 REST 服务。我想根据 HTTP 状态处理响应:

  1. 如果状态为 2xx,我想 return Mono 反序列化响应主体。

  2. 如果状态为 404,我想删除响应正文并立即 return 清空 Mono

  3. 对于任何其他状态,我想删除响应正文和 return 错误单声道 MyBusinessException

我的代码如下所示:

webClient.get()
    .uri("/search")
    .syncBody(request)
    .exchange()
    .flatMap { response ->
        when {
            response.statusCode().is2xxSuccessful -> response.bodyToMono(MyResponse::class.java)
            response.statusCode() == NOT_FOUND -> Mono.empty()
            else -> MyBusinessException().toMono<MyResponse>()
        }
     }

我不想浪费时间接收和处理不需要的响应正文。 exchange() 方法的 JavaDoc 状态

You must always use one of the body or entity methods of the response to ensure resources are released.

如果我想立即耗尽响应正文并 return 结果,我应该怎么做?

需要耗尽响应,以便连接可以重新用于未来的请求(即 http keep-alive / 持久连接)。

到 return 一个空的 Mono,它在 body 被耗尽(忽略错误)后完成:

// Using WebFlux >= 5.2
response.releaseBody()
    // ignore errors
    .onErrorResume(exception -> Mono.empty());


// Using WebFlux < 5.2
response.body(BodyExtractors.toDataBuffers())
    // release DataBuffers
    .doOnNext(DataBufferUtils::release)
    // ignore errors
    .onErrorResume(exception -> Mono.empty())
    // return an empty Mono
    .then();

到return一个立即完成的空Mono,并在后台异步耗尽body(忽略错误):

// Using WebFlux >= 5.2
Mono.<Void>empty()
    .doOnSubscribe(s ->
        // drain the body
        response.releaseBody()
            // initiate drain on a separate Scheduler
            .subscribeOn(Schedulers.parallel())
            // subscribe, and ignore errors
            .subscribe())

// Using WebFlux < 5.2
Mono.<Void>empty()
    .doOnSubscribe(s ->
        // drain the body
        response.body(BodyExtractors.toDataBuffers())
            // release DataBuffers
            .doOnNext(DataBufferUtils::release)
            // initiate drain on a separate Scheduler
            .subscribeOn(Schedulers.parallel())
            // subscribe, and ignore errors
            .subscribe())

我仍然推荐第一个选项,因为它会立即释放资源,并且可能是 WebClient 开发人员在编写和记录其用法时所考虑的。

我从未在生产系统中使用过第二个选项,因此请自行测试以确保 http 连接池的行为符合预期。如果使用 reactor-netty,您可以在 reactor.netty.resources.PooledConnectionProvider 上启用调试日志记录以比较这两种方法。