取消使用 WebFlux 创建的 SSE 流的正确方法
Correct way to cancel SSE stream created with WebFlux
首先,完整的 运行 代码示例(包括客户端和服务器)可在此处获得:https://github.com/kklepacz/webflux-cancel-subscription
案例描述:
我有生成数据的遗留系统(我在我的 github 示例中用 SampleEmitter
模拟了它。我可以提供监听器,当新数据到达时它会收到通知。我想"translate" 这些监听器调用到数据流中。这个流需要是无限的(只要系统工作就会有新的值)。准确地说它也需要是热源(在[=15之后=] 项目术语)所以每当有人订阅时,他只会收到当前值。
翻译如下:
class ReactiveRepoImpl implements ReactiveRepo {
private static final Logger log = LoggerFactory.getLogger(ReactiveRepoImpl.class);
private final UnicastProcessor<MyObject> hotProcessor = UnicastProcessor.create();
private final FluxSink<MyObject> fluxSink = hotProcessor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<MyObject> hotFlux = hotProcessor.publish().autoConnect();
@Override
public void save(MyObject elem) {
fluxSink.next(elem);
}
@Override
public Flux<MyObject> findAll() {
return hotFlux;
}
}
所以此时我可以像这样通过 RouterFunctions
公开它:
@Bean
public RouterFunction routerFunction(ReactiveRepo repo) {
return RouterFunctions.route(GET("/objects"), serverRequest -> {
log.info("Subscribing for GET /objects");
return ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(repo.findAll(), MyObject.class);
});
}
如您所见,我在这里制作服务器端事件。
现在让我们消费它:
WebClient.create("http://localhost:8080")
.get()
.uri("/objects")
.retrieve()
.bodyToFlux(MyObject.class)
.subscribe(n -> log.info("Next: {}", n.getId()),
e -> log.error("Error: {}", e),
() -> log.info("Completed"));
问题是:
当我终止客户端或调用 Disposable.dispose()
时,我在服务器应用程序中收到以下错误:
2018-03-20 12:00:50.926 ERROR 11944 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : Unhandled failure: An established connection was aborted by the software in your host machine, response already set (status=200)
2018-03-20 12:00:50.941 ERROR 11944 --- [ctor-http-nio-2] o.s.h.s.r.ReactorHttpHandlerAdapter : Handling completed with error
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_144]
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_144]
at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_144]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.access00(AbstractChannelHandlerContext.java:38) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
2018-03-20 12:00:50.957 ERROR 11944 --- [ctor-http-nio-2] r.ipc.netty.channel.ChannelOperations : [HttpServer] Error processing connection. Requesting close the channel
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_144]
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_144]
at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_144]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.access00(AbstractChannelHandlerContext.java:38) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
如何避免?创建这样的热源流并正确处理多个客户端 disconnections/stream 取消和 connections/subscriptions 的正确方法是什么?
为了让您全面了解为什么我需要这样的功能 - 我有一个带有选项卡的 Web 应用程序。在一个选项卡上,我显示了从我的服务器接收到的所有数据。当我切换标签时,我不希望我的流继续,我想退订并在我返回此标签时再次订阅。
TLDR;
dispose()
是取消订阅流的正确方法。
完整答案:
我无法在互联网上找到答案,所以我询问了消息来源 - Spring 团队。
这是我创建的 JIRA:https://jira.spring.io/browse/SPR-16688
因此,根据那里的评论,上面的堆栈跟踪是在使用 Undertow
、Tomcat
和 Netty
时未涵盖 Windows 环境特定异常的结果。它们不应出现在 Linux/Unix 或使用 Jetty
时。 Spring 团队将在下一个版本中对此进行微调 - 根据 JIRA,可能 5.0.6
and/or 5.1 RC1
。
首先,完整的 运行 代码示例(包括客户端和服务器)可在此处获得:https://github.com/kklepacz/webflux-cancel-subscription
案例描述:
我有生成数据的遗留系统(我在我的 github 示例中用 SampleEmitter
模拟了它。我可以提供监听器,当新数据到达时它会收到通知。我想"translate" 这些监听器调用到数据流中。这个流需要是无限的(只要系统工作就会有新的值)。准确地说它也需要是热源(在[=15之后=] 项目术语)所以每当有人订阅时,他只会收到当前值。
翻译如下:
class ReactiveRepoImpl implements ReactiveRepo {
private static final Logger log = LoggerFactory.getLogger(ReactiveRepoImpl.class);
private final UnicastProcessor<MyObject> hotProcessor = UnicastProcessor.create();
private final FluxSink<MyObject> fluxSink = hotProcessor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<MyObject> hotFlux = hotProcessor.publish().autoConnect();
@Override
public void save(MyObject elem) {
fluxSink.next(elem);
}
@Override
public Flux<MyObject> findAll() {
return hotFlux;
}
}
所以此时我可以像这样通过 RouterFunctions
公开它:
@Bean
public RouterFunction routerFunction(ReactiveRepo repo) {
return RouterFunctions.route(GET("/objects"), serverRequest -> {
log.info("Subscribing for GET /objects");
return ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(repo.findAll(), MyObject.class);
});
}
如您所见,我在这里制作服务器端事件。
现在让我们消费它:
WebClient.create("http://localhost:8080")
.get()
.uri("/objects")
.retrieve()
.bodyToFlux(MyObject.class)
.subscribe(n -> log.info("Next: {}", n.getId()),
e -> log.error("Error: {}", e),
() -> log.info("Completed"));
问题是:
当我终止客户端或调用 Disposable.dispose()
时,我在服务器应用程序中收到以下错误:
2018-03-20 12:00:50.926 ERROR 11944 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter : Unhandled failure: An established connection was aborted by the software in your host machine, response already set (status=200)
2018-03-20 12:00:50.941 ERROR 11944 --- [ctor-http-nio-2] o.s.h.s.r.ReactorHttpHandlerAdapter : Handling completed with error
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_144]
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_144]
at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_144]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.access00(AbstractChannelHandlerContext.java:38) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
2018-03-20 12:00:50.957 ERROR 11944 --- [ctor-http-nio-2] r.ipc.netty.channel.ChannelOperations : [HttpServer] Error processing connection. Requesting close the channel
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.writev0(Native Method) ~[na:1.8.0_144]
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55) ~[na:1.8.0_144]
at sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~[na:1.8.0_144]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:418) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1376) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:535) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:245) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:59) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) [reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:461) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:191) [reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext.access00(AbstractChannelHandlerContext.java:38) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1129) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.22.Final.jar:4.1.22.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.22.Final.jar:4.1.22.Final]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
如何避免?创建这样的热源流并正确处理多个客户端 disconnections/stream 取消和 connections/subscriptions 的正确方法是什么?
为了让您全面了解为什么我需要这样的功能 - 我有一个带有选项卡的 Web 应用程序。在一个选项卡上,我显示了从我的服务器接收到的所有数据。当我切换标签时,我不希望我的流继续,我想退订并在我返回此标签时再次订阅。
TLDR;
dispose()
是取消订阅流的正确方法。
完整答案:
我无法在互联网上找到答案,所以我询问了消息来源 - Spring 团队。 这是我创建的 JIRA:https://jira.spring.io/browse/SPR-16688
因此,根据那里的评论,上面的堆栈跟踪是在使用 Undertow
、Tomcat
和 Netty
时未涵盖 Windows 环境特定异常的结果。它们不应出现在 Linux/Unix 或使用 Jetty
时。 Spring 团队将在下一个版本中对此进行微调 - 根据 JIRA,可能 5.0.6
and/or 5.1 RC1
。