如何将 ConcurrentWebSocketSessionDecorator 与自定义 WebSocketHandler 一起使用
How to use ConcurrentWebSocketSessionDecorator with a custom WebSocketHandler
我有一个自定义的 WebSocket 处理程序(用于自定义子协议),以通常的方式注册:
public class WSConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry
.addHandler(new PerConnectionWebSocketHandler(CustomProtocolHandler.class), endpointUrl);
}
}
在 CustomProtocolHandler
中,所有从 WebSocketHandler
继承的方法(例如 afterConnectionEstablished
、handleMessage
等)都接收 WebSocketSession
,这不是线程安全的。官方教程说 ConcurrentWebSocketSessionDecorator
可以用来防止并发写入,但是当我的自定义处理程序被命中时,已经来不及包装会话了。
public class CustomProtocolHandler implements WebSocketHandler, SubProtocolCapable {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
//This is not safe
session.sendMessage(...);
//And this this doesn't make sense
new ConcurrentWebSocketSessionDecorator(session).sendMessage(...);
}
}
现在,我看到 Spring 自己的 SubProtocolWebSocketHandler
(它和我的一样,实现了 WebSocketHandler
和 SubProtocolCapable
)确实在内部装饰了会话,我希望我能够将它用作基础,但要创建 SubProtocolWebSocketHandler
我需要 MessageChannel
,它来自 spring-messaging。我既不使用 Spring 消息,也不知道如何获得 MessageChannel
。我应该实施自定义的吗?
那么,ConcurrentWebSocketSessionDecorator
的用途是什么?我希望有一种方法可以使用 webSocketHandlerRegistry
注册会话包装逻辑,但没有。我唯一的想法是维护另一个 ConcurrentHashMap
将原始会话(或其 ID)映射到包装器,但这太可怕了,我不需要的一件事是更多的状态来管理和清理。
我最终制作了我自己的 PerConnectionWebSocketHandler
版本,与原始版本一样,它维护了一个会话到处理程序的映射,但我的将处理程序包装到一个容器中,该容器还包含处理程序的装饰会话使用。
原来的PerConnectionWebSocketHandler
是这样的:
public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFactoryAware {
private final Map<WebSocketSession, WebSocketHandler> handlers = new ConcurrentHashMap<>();
...
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WebSocketHandler handler = this.provider.getHandler();
this.handlers.put(session, handler);
handler.afterConnectionEstablished(session);
}
}
我的现在看起来像这样:
public class CustomPerConnectionWebSocketHandler implements WebSocketHandler {
private final Map<WebSocketSession, HandlerWrapper> handlers = new ConcurrentHashMap<>();
...
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WebSocketHandler handler = ...;
//Here's a chance to decorate the session as needed
WebSocketSession decoratedSession = new ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, sendBufferSizeLimit);
HandlerWrapper wrapper = new HandlerWrapper(handler, decoratedSession);
this.handlers.put(session, wrapper);
wrapper.afterConnectionEstablished();
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
//Delegate all the calls in this fashion
handlers.get(session).handleMessage(message);
}
...
private static class HandlerWrapper {
private final WebSocketHandler handler;
private final WebSocketSession session;
HandlerWrapper(WebSocketHandler handler, WebSocketSession session) {
this.handler = handler;
this.session = session;
}
void afterConnectionEstablished() throws Exception {
handler.afterConnectionEstablished(session);
}
void handleMessage(WebSocketMessage<?> message) throws Exception {
handler.handleMessage(session, message);
}
... //Other delegating methods
}
}
我有一个自定义的 WebSocket 处理程序(用于自定义子协议),以通常的方式注册:
public class WSConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry
.addHandler(new PerConnectionWebSocketHandler(CustomProtocolHandler.class), endpointUrl);
}
}
在 CustomProtocolHandler
中,所有从 WebSocketHandler
继承的方法(例如 afterConnectionEstablished
、handleMessage
等)都接收 WebSocketSession
,这不是线程安全的。官方教程说 ConcurrentWebSocketSessionDecorator
可以用来防止并发写入,但是当我的自定义处理程序被命中时,已经来不及包装会话了。
public class CustomProtocolHandler implements WebSocketHandler, SubProtocolCapable {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
//This is not safe
session.sendMessage(...);
//And this this doesn't make sense
new ConcurrentWebSocketSessionDecorator(session).sendMessage(...);
}
}
现在,我看到 Spring 自己的 SubProtocolWebSocketHandler
(它和我的一样,实现了 WebSocketHandler
和 SubProtocolCapable
)确实在内部装饰了会话,我希望我能够将它用作基础,但要创建 SubProtocolWebSocketHandler
我需要 MessageChannel
,它来自 spring-messaging。我既不使用 Spring 消息,也不知道如何获得 MessageChannel
。我应该实施自定义的吗?
那么,ConcurrentWebSocketSessionDecorator
的用途是什么?我希望有一种方法可以使用 webSocketHandlerRegistry
注册会话包装逻辑,但没有。我唯一的想法是维护另一个 ConcurrentHashMap
将原始会话(或其 ID)映射到包装器,但这太可怕了,我不需要的一件事是更多的状态来管理和清理。
我最终制作了我自己的 PerConnectionWebSocketHandler
版本,与原始版本一样,它维护了一个会话到处理程序的映射,但我的将处理程序包装到一个容器中,该容器还包含处理程序的装饰会话使用。
原来的PerConnectionWebSocketHandler
是这样的:
public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFactoryAware {
private final Map<WebSocketSession, WebSocketHandler> handlers = new ConcurrentHashMap<>();
...
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WebSocketHandler handler = this.provider.getHandler();
this.handlers.put(session, handler);
handler.afterConnectionEstablished(session);
}
}
我的现在看起来像这样:
public class CustomPerConnectionWebSocketHandler implements WebSocketHandler {
private final Map<WebSocketSession, HandlerWrapper> handlers = new ConcurrentHashMap<>();
...
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WebSocketHandler handler = ...;
//Here's a chance to decorate the session as needed
WebSocketSession decoratedSession = new ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, sendBufferSizeLimit);
HandlerWrapper wrapper = new HandlerWrapper(handler, decoratedSession);
this.handlers.put(session, wrapper);
wrapper.afterConnectionEstablished();
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
//Delegate all the calls in this fashion
handlers.get(session).handleMessage(message);
}
...
private static class HandlerWrapper {
private final WebSocketHandler handler;
private final WebSocketSession session;
HandlerWrapper(WebSocketHandler handler, WebSocketSession session) {
this.handler = handler;
this.session = session;
}
void afterConnectionEstablished() throws Exception {
handler.afterConnectionEstablished(session);
}
void handleMessage(WebSocketMessage<?> message) throws Exception {
handler.handleMessage(session, message);
}
... //Other delegating methods
}
}