从 Spring websocket stomp 服务器断开客户端会话

Disconnect client session from Spring websocket stomp server

我搜索了很多但没能找到这个:有没有一种方法可以让 spring websocket stomp 服务器可以根据 sessionId(或者实际上是基于任何东西)断开客户端的连接?

在我看来,一旦客户端连接到服务器,就没有任何东西可以让服务器断开客户端的连接。

据我所知 API 没有提供您要查找的内容,在服务器端您只能检测断开连接事件。如果您想断开某个客户端的连接,我认为您必须采取一些小的解决方法,例如这个:

  1. 编写能够触发断开连接的客户端 javascript 函数
  2. 一旦您的客户端连接到服务器,就在您的 javascript 中生成一个客户端 ID 并将其发送到服务器。记住客户端上的 ID,您将在步骤 (4) 中需要它。
  3. 当您希望服务器断开与特定客户端(由 ID 标识)的连接时,将包含该 ID 的消息发送回客户端。
  4. 现在您的客户端 javascript 评估从服务器发送的消息并决定调用您在步骤 (1) 中编写的断开连接函数。
  5. 您的客户端自行断开连接。

解决方法有点麻烦,但它会起作用。

实际上使用一些变通方法你可以达到你想要的。 为此你应该这样做:

  1. 使用java配置(不确定XML配置是否可行)
  2. WebSocketMessageBrokerConfigurationSupport 扩展您的配置 class 并实现 WebSocketMessageBrokerConfigurer 接口
  3. 创建自定义子协议 websocket 处理程序并将其扩展自 SubProtocolWebSocketHandler class
  4. 在您的自定义子协议 websocket 处理程序中覆盖 afterConnectionEstablished 方法,您将可以访问 WebSocketSession :)

我创建了示例 spring-boot 项目来展示我们如何断开客户端会话与服务器端的连接: https://github.com/isaranchuk/spring-websocket-disconnect

您也可以通过实现自定义 WebSocketHandlerDecorator:

来断开会话
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig<S extends ExpiringSession> extends AbstractSessionWebSocketMessageBrokerConfigurer<S> {

    @Override
    public void configureWebSocketTransport(final WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(final WebSocketHandler handler) {
                return new WebSocketHandlerDecorator(handler) {
                    @Override
                    public void afterConnectionEstablished(final WebSocketSession session) throws Exception {

                        session.close(CloseStatus.NOT_ACCEPTABLE);
                        super.afterConnectionEstablished(session);
                    }
                };
            }
        });
        super.configureWebSocketTransport(registration);
    }


    @Override
    protected void configureStompEndpoints(final StompEndpointRegistry registry) {
    registry.addEndpoint("/home")
            .setHandshakeHandler(new DefaultHandshakeHandler(
                    new UndertowRequestUpgradeStrategy() // If you use undertow
                    // new JettyRequestUpgradeStrategy()
                    // new TomcatRequestUpgradeStrategy()
            ))
            .withSockJS();
    }
}

如果是 xml 配置,您可以在 <websocket:message-broker><websocket:transport> 中使用 <websocket:decorator-factories>。 创建实现 decorate 方法的自定义 WebSocketHandlerDecoratorWebSocketHandlerDecoratorFactory

这可能看起来很简短,但我不确定在您的情况下实现会是什么样子。但是,我认为在某些情况下可以保证 workaround/solution:

  1. 在后端设置超时(比如 30 秒):
    • 这就是使用 Spring Boot Websocket(和 Tomcat)的方式:
    @Bean
    public ServletServerContainerFactoryBean websocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxSessionIdleTimeout(MAX_SESSION_IDLE_TIMEOUT); 
        return container;
    }
  1. 如果您想保持会话打开 - 继续发送消息或主动发送 ping/pongs。如果您希望会话断开连接,请在应用程序中合适的位置停止 ping/pong 交互。

当然,如果您想立即断开连接,这似乎不是一个合适的解决方案。但是,如果您只是想减少活动连接的数量,ping/pong 可能是一个不错的选择,因为它只在主动发送消息时保持会话打开,从而防止会话过早关闭。

首先你必须通过继承引入一个 class 作为你的用户 class 然后像这样使用它:

if (userObject instanceof User) {
    User user = (User) userObject;
    if (user.getId().equals(userDTO.getId())) {
       for (SessionInformation information : sessionRegistry.getAllSessions(user, true)) {
          information.expireNow();
       }
    }
}    

我借鉴了@Dániel Kis的思路,实现了websocket会话管理,重点是在类Singleton对象中为经过身份验证的用户存储websocket会话。

// WebSocketConfig.java

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(final WebSocketHandler handler) {
                return new WebSocketHandlerDecorator(handler) {

                    @Override
                    public void afterConnectionEstablished(final WebSocketSession session) throws Exception {

                        // We will store current user's session into WebsocketSessionHolder after connection is established
                        String username = session.getPrincipal().getName();
                        WebsocketSessionHolder.addSession(username, session);

                        super.afterConnectionEstablished(session);
                    }
                };
            }
        });
    }
}

Class 用于存储 websocket 用户的会话 WebsocketSessionHolder。我使用 'synchronized' 块来保证线程安全。实际上这个块不是昂贵的操作,因为每个方法(addSession 和 closeSessions)都不经常使用(在建立和终止连接时)。这里不需要使用 ConcurrentHashMap 或 SynchronizedMap,因为我们在这些方法中对列表进行了一堆操作。

// WebsocketSessionHolder.java

public class WebsocketSessionHolder {

    static {
        sessions = new HashMap<>();
    }
    
    // key - username, value - List of user's sessions
    private static Map<String, List<WebSocketSession>> sessions;

    public static void addSession(String username, WebSocketSession session)
    {
        synchronized (sessions) {
            var userSessions = sessions.get(username);
            if (userSessions == null)
                userSessions = new ArrayList<WebSocketSession>();

            userSessions.add(session);
            sessions.put(username, userSessions);
        }
    }

    public static void closeSessions(String username) throws IOException 
    {
        synchronized (sessions) {
            var userSessions = sessions.get(username);
            if (userSessions != null)
            {
                for(var session : userSessions) {
                    // I use POLICY_VIOLATION to indicate reason of disconnecting for a client
                    session.close(CloseStatus.POLICY_VIOLATION);
                }
                sessions.remove(username);
            }
        }
    }
}

最后一步 - 终止(断开连接)指定的用户 websocket 会话(示例中的“ADMIN”),例如在某些控制器中

//PageController.java

@Controller
public class PageController {
    @GetMapping("/kill-sessions")
    public void killSessions() throws Exception {

        WebsocketSessionHolder.closeSessions("ADMIN");
    }
}