使用 spring 引导 rsocket 捕获取消帧类型
with spring boot rsocket capture the cancel frame type
我有一个 spring 启动 rsocket 实现,如果客户端取消或关闭他们的 rsocket 请求,那么我想取消服务器上的其他订阅注册。
在 spring 引导服务器的日志中,我可以看到发送或接收了一条取消消息:
WARN i.r.t.n.s.WebsocketServerTransport [reactor-http-nio-3] received WebSocket Close Frame - connection is closing
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()
如何捕获和处理这个取消信号?
我试过取消端点,但这些端点没有捕获到信号:
@MessageMapping("cancel")
Flux<Object> onCancel() {
log.info("Captured cancel signal");
}
或
@ConnectMapping("cancel")
Flux<Object> onCancel2() {
log.info("Captured cancel2 signal");
}
这个问题在 is possibly related, and this question on
要捕获取消信号,您可以使用订阅 onClose()
事件。
在你的控制器中
@Controller
class RSocketConnectionController {
@ConnectMapping("client-id")
fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
// rSocketRequester.rsocket().dispose() //to reject connection
rSocketRequester
.rsocket()
.onClose()
.subscribe(null, null, {
log.info("{} just disconnected", clientId)
//TODO here whatever you want
})
}
}
您的客户端需要正确发送 SETUP 帧才能调用此 @ConnectMapping
。如果你使用 rsocket-js
你需要像这样添加一个有效载荷:
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
//for connection mapping on server
payload: {
data: 'unique-client-id', //TODO you can receive this data on server side
metadata: String.fromCharCode("client-id".length) + "client-id"
},
// ms btw sending keepalive to server
keepAlive: 60000,
.....
}
});
这不是一个很好的问题。答案是
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()
被从原始 @MessageMapping
端点设置的 FluxSink
看到。
例如:
@MessageMapping("hello")
Flux<Object> hello(@Payload String message) {
return myService.generateWorld(message);
}
在myService
class
public Flux<Object> generateWorld(String hello) {
EmitterProcessor<Object> emitter = EmitterProcessor.create();
FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
// doing stuff with sink here
sink.next(stuff());
// This part will handle a cancel from the client
sink.onCancel(() -> {log.info("********** SINK.onCancel ***********");});
return Flux.from(emitter));
}
sink.onCancel()
将处理从客户端到 hello
端点的流量取消。
我有一个 spring 启动 rsocket 实现,如果客户端取消或关闭他们的 rsocket 请求,那么我想取消服务器上的其他订阅注册。
在 spring 引导服务器的日志中,我可以看到发送或接收了一条取消消息:
WARN i.r.t.n.s.WebsocketServerTransport [reactor-http-nio-3] received WebSocket Close Frame - connection is closing
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()
如何捕获和处理这个取消信号?
我试过取消端点,但这些端点没有捕获到信号:
@MessageMapping("cancel")
Flux<Object> onCancel() {
log.info("Captured cancel signal");
}
或
@ConnectMapping("cancel")
Flux<Object> onCancel2() {
log.info("Captured cancel2 signal");
}
这个问题在
要捕获取消信号,您可以使用订阅 onClose()
事件。
在你的控制器中
@Controller
class RSocketConnectionController {
@ConnectMapping("client-id")
fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
// rSocketRequester.rsocket().dispose() //to reject connection
rSocketRequester
.rsocket()
.onClose()
.subscribe(null, null, {
log.info("{} just disconnected", clientId)
//TODO here whatever you want
})
}
}
您的客户端需要正确发送 SETUP 帧才能调用此 @ConnectMapping
。如果你使用 rsocket-js
你需要像这样添加一个有效载荷:
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
//for connection mapping on server
payload: {
data: 'unique-client-id', //TODO you can receive this data on server side
metadata: String.fromCharCode("client-id".length) + "client-id"
},
// ms btw sending keepalive to server
keepAlive: 60000,
.....
}
});
这不是一个很好的问题。答案是
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()
被从原始 @MessageMapping
端点设置的 FluxSink
看到。
例如:
@MessageMapping("hello")
Flux<Object> hello(@Payload String message) {
return myService.generateWorld(message);
}
在myService
class
public Flux<Object> generateWorld(String hello) {
EmitterProcessor<Object> emitter = EmitterProcessor.create();
FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
// doing stuff with sink here
sink.next(stuff());
// This part will handle a cancel from the client
sink.onCancel(() -> {log.info("********** SINK.onCancel ***********");});
return Flux.from(emitter));
}
sink.onCancel()
将处理从客户端到 hello
端点的流量取消。