Spring Boot 2.4 RSocket 重复检测丢失的连接

Spring Boot 2.4 RSocket detect lost connection repeatedly

我使用以下配置将 RsocketClient 连接到 RsocketServer。这很好用。我想检测连接丢失。通过注册到 RsocketClient 的底层 rsocket 的 OnClose Mono,效果很好。但只有一次。如果我反复连接丢失和重新连接,如何检测关闭?

requester = rsocketRequesterBuilder.setupRoute("shell-client").setupData(CLIENT_ID)
                        .setupMetadata(accessToken, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
                        .rsocketStrategies(rsocketStrategies).
                        rsocketConnector(connector -> connector.acceptor(responder).reconnect(Retry.backoff(1000, Duration.ofMillis(500)).doBeforeRetry(x->{log.info("Retry");
                        
                        
                        
                        }).doAfterRetry(x->{log.info(x.toString());})))
                        .tcp("x.y.z.40", 8080);
                
               
                RSocket r=requester.rsocketClient().source().block();
                r.onClose().doOnError(x->{
                    log.info("Error");}).doFinally(x->{
                    log.info("Disconnected");
                }).subscribe();

您可以使用以下代码实现所需的行为:

 requester
       .rsocketClient()
       .source()
       .flatMap(rsocket -> rsocket.onClose())
       .repeat()
       .retry()
       .subscribe();

在上面的例子中,一旦连接丢失,rsocket.onClose()就会发送一个终止信号。由于您已经安装了重新连接功能,下一次订阅 RSocketClient.source() 将导致重新建立​​连接。因此,一旦发生这种情况,您将收到一个新的 rsocket 实例,并将在 flatMap 运算符内再次订阅 onClose 流。

要重复此操作,我们可以使用 .repeatretry 因此每当 onClose 终止(这是断开连接的指示)时,对 RSocketClient.source 的订阅将重复,您将能够获得新连接并再次开始收听 onClose