Spring websocket 从多个线程发送消息

Spring websocket send message from multiple threads

我正在为我的一个基于 spring 的项目使用 Spring WebSocket 服务器实现。我遇到一个错误 The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state。我发现问题是同时从不同的线程写入 websocket。

我是如何临时修复它的:考虑我已经实现了下面的方法

void sendMessageToSession(WebsocketSession session,String message);

向 websocket 会话发送 TextMessage。我无法使整个方法同步,因为多个线程可以为不同的 websocketSession 和消息调用它。我也不能将会话放在同步块中(试过但没有用)

虽然,我这样解决了我的问题

synchronized(session.getId()){ 
    //sending message;
}

而且我不再遇到那个问题。但是在同步块中使用字符串似乎不是好的做法。 那么我还有什么其他解决方案?发送异步消息的最佳方式是什么?

PS:连接建立后我已经使用了ConcurrentWebSocketSessionDecorator,我正在使用更新的websocket。没有帮助。

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);

注意 我将我的 websocet 会话保存在地图中,其中键是 session.getId,值是会话本身。

与其他一些 websocket 实现不同,Spring websocket 引用在每条消息上似乎并不相等。我通过他们的 ID 将会话保存在地图中,并且在每条消息上我检查传递的 websocket 与我已经放在我的地图上的 websocket 是否相等,它是错误的。

通过在我的 WebsocketSession 后面添加 volatile 关键字,在我坚持我的会话的地方,我解决了这个问题。我很高兴知道这是否也是一种不好的做法。但我的想法是,当从多个线程写入 websocket 会话时,这些线程会丢失 websocket 的状态,因为它尚未更新,这就是抛出此异常的原因。

通过添加 volatile,我们确保 websocket 状态在另一个线程使用它之前已经更新,因此写入 websocket 的工作会按预期同步。

我创建了一个名为 SessionData 的 class,它包含 websocketSession 和我需要的有关会话的所有其他数据。

public class SessionData {
    private volatile WebSocketSession websocketSession;
    //...other 
    // getters and setters ...
}

并且我使用 SessionData 作为会话 ID 是键的映射的值

然后当从 SessionData 获取 websocketSession 并从不同的线程写入它时,volatile 帮助我更新了 websocketSession。


更新 (2020)

这里的一个关键注意事项是每次要向会话发送消息时都应使用 sessionData.getWebsocketSession.sendMessage(...)。你不应该直接使用会话,这意味着像 this 这样的代码是 不好的做法:

WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);

您永远不会知道这两行代码之间对 websocket 会话应用了哪些更改(在您的情况下可能超过 2 行)。

像这样的代码更好:

sessionData.getWebSocketSession().sendMessage(...);

也不要直接发布到在 Spring websocket MessageHandler 中传递给您的会话中。否则您可能会再次遇到该错误。

这就是为什么在连接打开时将 WebSocketSession 中的 sessionId 映射到 SessionData 的良好做法。您可以使用此存储库获取 volatile session 使用会话 ID 而不是直接使用会话。

ConcurrentWebSocketSessionDecorator 在多线程中就像一个魅力,它是为它而设计的。 您的地图实现可能有问题。

示例代码:

private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception 
{
    // Use the following will crash :
    //sessions.put(session.getId(), new SessionData(session));

    // Use ConcurrentWebSocketSessionDecorator is safe :
    sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
    super.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
    sessions.remove(session.getId());
    super.afterConnectionClosed(session, status); 
}

public void send(WebSocketSession session, String msg) throws MessagingException {
    try {
        session.sendMessage (new TextMessage(msg));
    } catch (IOException ex) {
        throw new MessagingException(ex.getMessage());
    }
}

轻松测试多线程中的行为:

    public void sendMT(WebSocketSession session, String msg) throws MessagingException{
    for (int i=0; i<3; i++){
        new Thread(){
          @Override
          public void run(){
              send (session, msg);
        }.start();  
    }
}