如何处理客户端 EventSource.close() 关闭的反应流 http 连接上的 ClosedChannelException?

How to handle a ClosedChannelException on a reactive streams http connection closed by client EventSource.close()?

我正在遵循 Quarkus - Getting started with Reactive 指南并沿着示例使用服务器发送的事件,但我遇到了 ClosedChannelException。 resteasy 资源处理程序使用 vert.x 和引擎盖下的 netty 以及用于反应流的 SmallRye Mutiny 库。

资源处理程序以一秒为间隔生成 count 条消息。

@Path("/hello")
public class ReactiveGreetingResource {
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("/stream/{count}/{name}")
public Multi<String> greetingsAsStream(@PathParam int count, @PathParam String name) {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
    .onItem().apply(n -> String.format("hello %s - %d", name, n))
    .transform().byTakingFirstItems(count);
}
}

根据请求将消息发送到客户端。客户端是嵌入到 html 页面中的简单 javascript 函数:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html>
<html><head>
    <meta charset="UTF-8" />
    <title>SSE with Vert.x - Quarkus</title>
    <script type="application/javascript">
    var eventSource = new EventSource("/hello/stream/5/testname");
    eventSource.onmessage = function(event) {
        console.log(event.data);
        var container = document.getElementById("container");
        var paragraph = document.createElement("p");
        paragraph.innerHTML = event.data;
        container.appendChild(paragraph);
    }
    </script>
</head>
<body>
 <input type="button" onclick="stop()" value="stop sse"/>
 <div id="container"></div>
</body>
</html>

在客户端,一切都按预期工作。事件得到处理,单击停止按钮(当事件到达时)关闭 EventSource。

但是在服务器端它导致了这个异常:

2020-05-09 10:26:33,341 WARN  [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-13) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@241d306c(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
    : io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)

我应该如何适当地避免或处理这个异常?

解决方法

在 pom.xml

中添加 quarkus-undertow 作为依赖项后警告消失
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-undertow</artifactId>
</dependency>

错误报告的 response 中提到了这个想法。