Webflux Webclient 在 HttpStatus::is4xxClientError 发生时记录 "Failed to release a message"

Webflux Webclient logs "Failed to release a message" when HttpStatus::is4xxClientError occurs

我正在尝试使用 Webflux WebClient 从服务器下载 zip 文件。处理错误的正确方法是什么?

当服务器上存在文件时,一切正常。否则,我收到Netty的警告,它无法发布消息。

return client.get()
      .uri(String.format("/cache/%s", filename))
      .accept(MediaType.APPLICATION_OCTET_STREAM)
      .retrieve()
      .onStatus(HttpStatus::is3xxRedirection, response -> Mono.error(new RuntimeException(response.statusCode().getReasonPhrase())))
      .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException(response.statusCode().getReasonPhrase())))
      .bodyToMono(ByteArrayResource.class)
      .subscribeOn(Schedulers.parallel())
      .map(res -> {
        try {
          InputStream is = res.getInputStream();
          File targetFile = Paths.get(cacheDir).resolve(filename).toAbsolutePath().toFile();
          FileUtils.copyInputStreamToFile(is, targetFile);
          LOG.info("File {} saved", targetFile.toPath());
        } catch (IOException e) {
          throw new RuntimeException("File download error from cache");
        }
        return true;
      });

我希望日志中没有来自 Netty 的警告,但实际日志是:

WARN <reactor-http-epoll-4> Failed to release a message: DefaultLastHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success) (io.netty.util.ReferenceCountUtil:115)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:94) ~[netty-codec-http-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:120) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) [netty-codec-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) [netty-codec-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:432) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:333) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]

更新。感谢 Thomas,当我像这样更改代码时问题已经解决

.onStatus(HttpStatus::is3xxRedirection, exceptionFunction)
.onStatus(HttpStatus::is4xxClientError, exceptionFunction)
.....
.....
.....
private final Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction = response -> response.bodyToMono(String.class)
    .map(body -> new RuntimeException(String.format("%s, response body: %s", response.statusCode().toString(), body)));

文档说明如下:

When onStatus is used, if the response is expected to have content, then the onStatus callback should consume it. If not, the content will be automatically drained to ensure resources are released.

Official docs about retrieve() 最后一节。

这意味着如果发生错误时响应为空,那么框架会自动为您关闭连接。

另一方面,如果响应实际上有一些数据,您必须使用响应以便关闭连接。

所以你做的是对的,你可以记录响应,或者在异常中发送它。或忽略它。但是你需要像你所做的那样消费响应。

我也相信你可以 response.bodyToMono(Void.class) 如果你只是想处理它。