如何在 Spring Boot RSocket Reactive 中处理入站流取消
How to handle inbound stream cancellation in Spring Boot RSocket Reactive
目标
我想在我的 Spring 启动应用程序中有一个 RSocket 通道端点,我可以在其中处理取消入站、客户端驱动的流以进行一些服务器端清理。
设置
相关依赖:
- Spring 启动 2.4.2
- 科特林 1.4.21
- Kotlinx 协程 1.4.2
- RSocket 核心 1.1.0
我试图通过 Kotlin 协程流和 Reactor Flux(英文?)来实现我的目标。下面的两个 client/server 对应该做同样的事情:建立一个 RSocket 通道,从客户端发送 2 个“ping”有效负载,服务器用“pong”有效负载响应每个,然后客户端关闭连接。
流服务器端:
@MessageMapping("testFlow")
fun testPingFlow(input: Flow<String>): Flow<String> {
val cs = CoroutineScope(EmptyCoroutineContext)
val output = MutableSharedFlow<String>(10)
cs.launch {
try {
input
.catch { e ->
logger.error("Rsocket server input error", e)
}
.onCompletion { exception ->
logger.debug("Rsocket server input completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket server input flow", exception)
}
}
// Normal .collect complains about being internal-only
.collectIndexed { _, message ->
logger.debug("Rsocket server input received $message")
output.emit("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
}
return output
}
流量客户端测试:
@Test
fun testPingFlow() {
val outToServer = MutableSharedFlow<String>(10)
runBlocking {
val socketFlow = rSocketRequester
.route("testFlow")
.data(outToServer.asFlux())
.retrieveFlow<String>()
.take(2)
outToServer.emit("Ping ${System.currentTimeMillis()}")
outToServer.emit("Ping ${System.currentTimeMillis()}")
socketFlow
.onCompletion { exception ->
logger.debug("Rsocket client output completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket client output flow", exception)
}
}
.collect { message ->
logger.debug("Received pong from server $message")
}
}
}
通量服务器端:
@MessageMapping("testFlux")
fun testPingFlux(input: Flux<String>): Flux<String> {
val output = Sinks.many().unicast().onBackpressureBuffer<String>()
try {
input
.doOnNext { message ->
logger.debug("Rsocket server input message received $message")
}
.doOnError { e ->
logger.error("Rsocket server input connection error", e)
}
.doOnCancel {
logger.debug("Rsocket server input cancelled")
}
.doOnComplete {
logger.debug("Rsocket server input completed")
}
.subscribe { message ->
output.tryEmitNext("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
return output.asFlux()
}
Flux 客户端测试:
@Test
fun testPingFlux() {
val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()
rSocketRequester
.route("testFlux")
.data(outToServer.asFlux())
.retrieveFlux<String>()
.doOnCancel {
logger.debug("Rsocket client output connection completed")
}
.doOnError { e ->
logger.error("Exception received while processing Rsocket client output flow", e)
}
.take(2)
.subscribe { message ->
logger.debug("Received pong from server $message")
}
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
}
问题
上面的两个 client/server 片段实际上确实来回发送 ping/pong 有效载荷,但在每种情况下,我都没有在客户端的服务器端处理取消连接。我从客户端获得自己的 Rsocket client output completed
日志行,然后从 Reactor 获得 Operator called default onErrorDropped
并从 RSocket 获得以下堆栈跟踪:
java.util.concurrent.CancellationException: Inbound has been canceled
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
这是一个问题,因为(除了这个玩具示例)我的应用程序需要在连接关闭时进行服务器端清理。
我尝试失败的事情
- 在 Flows 或 Fluxen 上捕获异常、取消或完成的所有各种方法,其中许多在上面的示例中进行了说明。
- try/catch 块在 subscribe/collect lambda 中。
- 通过映射运算符将服务器响应 Flux/Flow 直接耦合到输入 Flux/Flow,而不是创建单独的输出 Flux/Flow。
- 在调试器中单步执行框架代码,我可以毫不羞愧地说我很快就迷失了。我从这次冒险中得到的最好的理论是,接收取消信号的 Flux/Flow 以某种方式与我的服务器方法接收的输入 Flux/Flow 分离,但是有太多的抽象层让我无法追踪它。
在此先感谢您的帮助。
Bug filed,将此问题标记为已回答。感谢大家的快速回复。
目标
我想在我的 Spring 启动应用程序中有一个 RSocket 通道端点,我可以在其中处理取消入站、客户端驱动的流以进行一些服务器端清理。
设置
相关依赖:
- Spring 启动 2.4.2
- 科特林 1.4.21
- Kotlinx 协程 1.4.2
- RSocket 核心 1.1.0
我试图通过 Kotlin 协程流和 Reactor Flux(英文?)来实现我的目标。下面的两个 client/server 对应该做同样的事情:建立一个 RSocket 通道,从客户端发送 2 个“ping”有效负载,服务器用“pong”有效负载响应每个,然后客户端关闭连接。
流服务器端:
@MessageMapping("testFlow")
fun testPingFlow(input: Flow<String>): Flow<String> {
val cs = CoroutineScope(EmptyCoroutineContext)
val output = MutableSharedFlow<String>(10)
cs.launch {
try {
input
.catch { e ->
logger.error("Rsocket server input error", e)
}
.onCompletion { exception ->
logger.debug("Rsocket server input completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket server input flow", exception)
}
}
// Normal .collect complains about being internal-only
.collectIndexed { _, message ->
logger.debug("Rsocket server input received $message")
output.emit("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
}
return output
}
流量客户端测试:
@Test
fun testPingFlow() {
val outToServer = MutableSharedFlow<String>(10)
runBlocking {
val socketFlow = rSocketRequester
.route("testFlow")
.data(outToServer.asFlux())
.retrieveFlow<String>()
.take(2)
outToServer.emit("Ping ${System.currentTimeMillis()}")
outToServer.emit("Ping ${System.currentTimeMillis()}")
socketFlow
.onCompletion { exception ->
logger.debug("Rsocket client output completed")
if (exception != null) {
logger.error("Exception received while processing Rsocket client output flow", exception)
}
}
.collect { message ->
logger.debug("Received pong from server $message")
}
}
}
通量服务器端:
@MessageMapping("testFlux")
fun testPingFlux(input: Flux<String>): Flux<String> {
val output = Sinks.many().unicast().onBackpressureBuffer<String>()
try {
input
.doOnNext { message ->
logger.debug("Rsocket server input message received $message")
}
.doOnError { e ->
logger.error("Rsocket server input connection error", e)
}
.doOnCancel {
logger.debug("Rsocket server input cancelled")
}
.doOnComplete {
logger.debug("Rsocket server input completed")
}
.subscribe { message ->
output.tryEmitNext("pong ${System.currentTimeMillis()}")
}
} catch (e: Throwable) {
logger.error("Rsocket server input connection exception caught", e)
}
return output.asFlux()
}
Flux 客户端测试:
@Test
fun testPingFlux() {
val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()
rSocketRequester
.route("testFlux")
.data(outToServer.asFlux())
.retrieveFlux<String>()
.doOnCancel {
logger.debug("Rsocket client output connection completed")
}
.doOnError { e ->
logger.error("Exception received while processing Rsocket client output flow", e)
}
.take(2)
.subscribe { message ->
logger.debug("Received pong from server $message")
}
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
}
问题
上面的两个 client/server 片段实际上确实来回发送 ping/pong 有效载荷,但在每种情况下,我都没有在客户端的服务器端处理取消连接。我从客户端获得自己的 Rsocket client output completed
日志行,然后从 Reactor 获得 Operator called default onErrorDropped
并从 RSocket 获得以下堆栈跟踪:
java.util.concurrent.CancellationException: Inbound has been canceled
at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
这是一个问题,因为(除了这个玩具示例)我的应用程序需要在连接关闭时进行服务器端清理。
我尝试失败的事情
- 在 Flows 或 Fluxen 上捕获异常、取消或完成的所有各种方法,其中许多在上面的示例中进行了说明。
- try/catch 块在 subscribe/collect lambda 中。
- 通过映射运算符将服务器响应 Flux/Flow 直接耦合到输入 Flux/Flow,而不是创建单独的输出 Flux/Flow。
- 在调试器中单步执行框架代码,我可以毫不羞愧地说我很快就迷失了。我从这次冒险中得到的最好的理论是,接收取消信号的 Flux/Flow 以某种方式与我的服务器方法接收的输入 Flux/Flow 分离,但是有太多的抽象层让我无法追踪它。
在此先感谢您的帮助。
Bug filed,将此问题标记为已回答。感谢大家的快速回复。