如何在 Spring Boot RSocket Reactive 中处理入​​站流取消

How to handle inbound stream cancellation in Spring Boot RSocket Reactive

目标

我想在我的 Spring 启动应用程序中有一个 RSocket 通道端点,我可以在其中处理取消入站、客户端驱动的流以进行一些服务器端清理。

设置

相关依赖:

我试图通过 Kotlin 协程流和 Reactor Flux(英文?)来实现我的目标。下面的两个 client/server 对应该做同样的事情:建立一个 RSocket 通道,从客户端发送 2 个“ping”有效负载,服务器用“pong”有效负载响应每个,然后客户端关闭连接。

流服务器端:

    @MessageMapping("testFlow")
    fun testPingFlow(input: Flow<String>): Flow<String> {
        val cs = CoroutineScope(EmptyCoroutineContext)
        val output = MutableSharedFlow<String>(10)

        cs.launch {
            try {
                input
                    .catch { e ->
                        logger.error("Rsocket server input error", e)
                    }
                    .onCompletion { exception ->
                        logger.debug("Rsocket server input completed")
                        if (exception != null) {
                            logger.error("Exception received while processing Rsocket server input flow", exception)
                        }
                    }
                    // Normal .collect complains about being internal-only
                    .collectIndexed { _, message ->
                        logger.debug("Rsocket server input received $message")
                        output.emit("pong ${System.currentTimeMillis()}")
                    }
            } catch (e: Throwable) {
                logger.error("Rsocket server input connection exception caught", e)
            }
        }
        return output
    }

流量客户端测试:

    @Test
    fun testPingFlow() {
        val outToServer = MutableSharedFlow<String>(10)

        runBlocking {
            val socketFlow = rSocketRequester
                .route("testFlow")
                .data(outToServer.asFlux())
                .retrieveFlow<String>()
                .take(2)

            outToServer.emit("Ping ${System.currentTimeMillis()}")
            outToServer.emit("Ping ${System.currentTimeMillis()}")

            socketFlow
                .onCompletion { exception ->
                    logger.debug("Rsocket client output completed")
                    if (exception != null) {
                        logger.error("Exception received while processing Rsocket client output flow", exception)
                    }
                }
                .collect { message ->
                    logger.debug("Received pong from server $message")
                }
        }
    }

通量服务器端:

    @MessageMapping("testFlux")
    fun testPingFlux(input: Flux<String>): Flux<String> {
        val output = Sinks.many().unicast().onBackpressureBuffer<String>()
        try {
            input
                .doOnNext { message ->
                    logger.debug("Rsocket server input message received $message")
                }
                .doOnError { e ->
                    logger.error("Rsocket server input connection error", e)
                }
                .doOnCancel {
                    logger.debug("Rsocket server input cancelled")
                }
                .doOnComplete {
                    logger.debug("Rsocket server input completed")
                }
                .subscribe { message ->
                    output.tryEmitNext("pong ${System.currentTimeMillis()}")
                }
        } catch (e: Throwable) {
            logger.error("Rsocket server input connection exception caught", e)
        }
        return output.asFlux()
    }

Flux 客户端测试:

    @Test
    fun testPingFlux() {
        val outToServer = Sinks.many().unicast().onBackpressureBuffer<String>()

        rSocketRequester
            .route("testFlux")
            .data(outToServer.asFlux())
            .retrieveFlux<String>()
            .doOnCancel {
                logger.debug("Rsocket client output connection completed")
            }
            .doOnError { e ->
                logger.error("Exception received while processing Rsocket client output flow", e)
            }
            .take(2)
            .subscribe { message ->
                logger.debug("Received pong from server $message")
            }

        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
        outToServer.tryEmitNext("Ping ${System.currentTimeMillis()}")
    }

问题

上面的两个 client/server 片段实际上确实来回发送 ping/pong 有效载荷,但在每种情况下,我都没有在客户端的服务器端处理取消连接。我从客户端获得自己的 Rsocket client output completed 日志行,然后从 Reactor 获得 Operator called default onErrorDropped 并从 RSocket 获得以下堆栈跟踪:

java.util.concurrent.CancellationException: Inbound has been canceled
    at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.2.jar:3.4.2]
    at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onNext(ReactorContextTestExecutionListener.java:120) ~[spring-security-test-5.4.2.jar:5.4.2]
    at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.2.jar:3.4.2]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.3.jar:1.0.3]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

这是一个问题,因为(除了这个玩具示例)我的应用程序需要在连接关闭时进行服务器端清理。

我尝试失败的事情

在此先感谢您的帮助。

Bug filed,将此问题标记为已回答。感谢大家的快速回复。