使用 spring 引导 rsocket 捕获取消帧类型

with spring boot rsocket capture the cancel frame type

我有一个 spring 启动 rsocket 实现,如果客户端取消或关闭他们的 rsocket 请求,那么我想取消服务器上的其他订阅注册。

在 spring 引导服务器的日志中,我可以看到发送或接收了一条取消消息:

WARN i.r.t.n.s.WebsocketServerTransport [reactor-http-nio-3] received WebSocket Close Frame - connection is closing
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

如何捕获和处理这个取消信号?

我试过取消端点,但这些端点没有捕获到信号:

@MessageMapping("cancel")
Flux<Object> onCancel() {
    log.info("Captured cancel signal");
}

@ConnectMapping("cancel")
Flux<Object> onCancel2() {
    log.info("Captured cancel2 signal");
}

这个问题在 is possibly related, and this question on

要捕获取消信号,您可以使用订阅 onClose() 事件。

在你的控制器中

@Controller
class RSocketConnectionController {

    @ConnectMapping("client-id")
    fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
//        rSocketRequester.rsocket().dispose()   //to reject connection
        rSocketRequester
                .rsocket()
                .onClose()
                .subscribe(null, null, {
                    log.info("{} just disconnected", clientId)

                    //TODO here whatever you want
                })
    }
}

您的客户端需要正确发送 SETUP 帧才能调用此 @ConnectMapping。如果你使用 rsocket-js 你需要像这样添加一个有效载荷:

const client = new RSocketClient({
        // send/receive JSON objects instead of strings/buffers
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {
          //for connection mapping on server
          payload: {
            data: 'unique-client-id',   //TODO you can receive this data on server side
            metadata: String.fromCharCode("client-id".length) + "client-id"
          },
          // ms btw sending keepalive to server
          keepAlive: 60000,
.....
        }
});

这不是一个很好的问题。答案是

INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

被从原始 @MessageMapping 端点设置的 FluxSink 看到。

例如:

@MessageMapping("hello")
Flux<Object> hello(@Payload String message) {       
    return myService.generateWorld(message);
}

myServiceclass

public Flux<Object> generateWorld(String hello) {
    EmitterProcessor<Object> emitter = EmitterProcessor.create();
    FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

    // doing stuff with sink here
    sink.next(stuff());

    // This part will handle a cancel from the client
    sink.onCancel(() -> {log.info("********** SINK.onCancel ***********");});

    return Flux.from(emitter));  
}

sink.onCancel() 将处理从客户端到 hello 端点的流量取消。