如何将 ConcurrentWebSocketSessionDecorator 与自定义 WebSocketHandler 一起使用

How to use ConcurrentWebSocketSessionDecorator with a custom WebSocketHandler

我有一个自定义的 WebSocket 处理程序(用于自定义子协议),以通常的方式注册:

public class WSConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        webSocketHandlerRegistry
                .addHandler(new PerConnectionWebSocketHandler(CustomProtocolHandler.class), endpointUrl);
    }
}

CustomProtocolHandler 中,所有从 WebSocketHandler 继承的方法(例如 afterConnectionEstablishedhandleMessage 等)都接收 WebSocketSession,这不是线程安全的。官方教程说 ConcurrentWebSocketSessionDecorator 可以用来防止并发写入,但是当我的自定义处理程序被命中时,已经来不及包装会话了。

public class CustomProtocolHandler implements WebSocketHandler, SubProtocolCapable {

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        //This is not safe
        session.sendMessage(...);

        //And this this doesn't make sense
        new ConcurrentWebSocketSessionDecorator(session).sendMessage(...);
    }
}

现在,我看到 Spring 自己的 SubProtocolWebSocketHandler(它和我的一样,实现了 WebSocketHandlerSubProtocolCapable)确实在内部装饰了会话,我希望我能够将它用作基础,但要创建 SubProtocolWebSocketHandler 我需要 MessageChannel,它来自 spring-messaging。我既不使用 Spring 消息,也不知道如何获得 MessageChannel。我应该实施自定义的吗?

那么,ConcurrentWebSocketSessionDecorator 的用途是什么?我希望有一种方法可以使用 webSocketHandlerRegistry 注册会话包装逻辑,但没有。我唯一的想法是维护另一个 ConcurrentHashMap 将原始会话(或其 ID)映射到包装器,但这太可怕了,我不需要的一件事是更多的状态来管理和清理。

我最终制作了我自己的 PerConnectionWebSocketHandler 版本,与原始版本一样,它维护了一个会话到处理程序的映射,但我的将处理程序包装到一个容器中,该容器还包含处理程序的装饰会话使用。

原来的PerConnectionWebSocketHandler是这样的:

public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFactoryAware {

    private final Map<WebSocketSession, WebSocketHandler> handlers = new ConcurrentHashMap<>();

    ...

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        WebSocketHandler handler = this.provider.getHandler();
        this.handlers.put(session, handler);
        handler.afterConnectionEstablished(session);
    }
}

我的现在看起来像这样:

public class CustomPerConnectionWebSocketHandler implements WebSocketHandler {

    private final Map<WebSocketSession, HandlerWrapper> handlers = new ConcurrentHashMap<>();

    ...

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        WebSocketHandler handler = ...;
        //Here's a chance to decorate the session as needed
        WebSocketSession decoratedSession = new ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, sendBufferSizeLimit);
        HandlerWrapper wrapper = new HandlerWrapper(handler, decoratedSession);
        this.handlers.put(session, wrapper);
        wrapper.afterConnectionEstablished();
    }


    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
         //Delegate all the calls in this fashion
         handlers.get(session).handleMessage(message);
    }

    ...

    private static class HandlerWrapper {

        private final WebSocketHandler handler;
        private final WebSocketSession session;

        HandlerWrapper(WebSocketHandler handler, WebSocketSession session) {
            this.handler = handler;
            this.session = session;
        }

        void afterConnectionEstablished() throws Exception {
            handler.afterConnectionEstablished(session);
        }

        void handleMessage(WebSocketMessage<?> message) throws Exception {
            handler.handleMessage(session, message);
        }

        ... //Other delegating methods
    }
}