Spring websocket 从多个线程发送消息
Spring websocket send message from multiple threads
我正在为我的一个基于 spring 的项目使用 Spring WebSocket 服务器实现。我遇到一个错误 The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state
。我发现问题是同时从不同的线程写入 websocket。
我是如何临时修复它的:考虑我已经实现了下面的方法
void sendMessageToSession(WebsocketSession session,String message);
向 websocket 会话发送 TextMessage。我无法使整个方法同步,因为多个线程可以为不同的 websocketSession 和消息调用它。我也不能将会话放在同步块中(试过但没有用)
虽然,我这样解决了我的问题
synchronized(session.getId()){
//sending message;
}
而且我不再遇到那个问题。但是在同步块中使用字符串似乎不是好的做法。
那么我还有什么其他解决方案?发送异步消息的最佳方式是什么?
PS:连接建立后我已经使用了ConcurrentWebSocketSessionDecorator
,我正在使用更新的websocket。没有帮助。
session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);
注意
我将我的 websocet 会话保存在地图中,其中键是 session.getId,值是会话本身。
与其他一些 websocket 实现不同,Spring websocket 引用在每条消息上似乎并不相等。我通过他们的 ID 将会话保存在地图中,并且在每条消息上我检查传递的 websocket 与我已经放在我的地图上的 websocket 是否相等,它是错误的。
通过在我的 WebsocketSession
后面添加 volatile
关键字,在我坚持我的会话的地方,我解决了这个问题。我很高兴知道这是否也是一种不好的做法。但我的想法是,当从多个线程写入 websocket 会话时,这些线程会丢失 websocket 的状态,因为它尚未更新,这就是抛出此异常的原因。
通过添加 volatile,我们确保 websocket 状态在另一个线程使用它之前已经更新,因此写入 websocket 的工作会按预期同步。
我创建了一个名为 SessionData
的 class,它包含 websocketSession 和我需要的有关会话的所有其他数据。
public class SessionData {
private volatile WebSocketSession websocketSession;
//...other
// getters and setters ...
}
并且我使用 SessionData 作为会话 ID 是键的映射的值
然后当从 SessionData 获取 websocketSession 并从不同的线程写入它时,volatile 帮助我更新了 websocketSession。
更新 (2020)
这里的一个关键注意事项是每次要向会话发送消息时都应使用 sessionData.getWebsocketSession.sendMessage(...)
。你不应该直接使用会话,这意味着像 this 这样的代码是 不好的做法:
WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);
您永远不会知道这两行代码之间对 websocket 会话应用了哪些更改(在您的情况下可能超过 2 行)。
像这样的代码更好:
sessionData.getWebSocketSession().sendMessage(...);
也不要直接发布到在 Spring websocket MessageHandler
中传递给您的会话中。否则您可能会再次遇到该错误。
这就是为什么在连接打开时将 WebSocketSession
中的 sessionId
映射到 SessionData
的良好做法。您可以使用此存储库获取 volatile session
使用会话 ID 而不是直接使用会话。
ConcurrentWebSocketSessionDecorator 在多线程中就像一个魅力,它是为它而设计的。
您的地图实现可能有问题。
示例代码:
private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception
{
// Use the following will crash :
//sessions.put(session.getId(), new SessionData(session));
// Use ConcurrentWebSocketSessionDecorator is safe :
sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
sessions.remove(session.getId());
super.afterConnectionClosed(session, status);
}
public void send(WebSocketSession session, String msg) throws MessagingException {
try {
session.sendMessage (new TextMessage(msg));
} catch (IOException ex) {
throw new MessagingException(ex.getMessage());
}
}
轻松测试多线程中的行为:
public void sendMT(WebSocketSession session, String msg) throws MessagingException{
for (int i=0; i<3; i++){
new Thread(){
@Override
public void run(){
send (session, msg);
}.start();
}
}
我正在为我的一个基于 spring 的项目使用 Spring WebSocket 服务器实现。我遇到一个错误 The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state
。我发现问题是同时从不同的线程写入 websocket。
我是如何临时修复它的:考虑我已经实现了下面的方法
void sendMessageToSession(WebsocketSession session,String message);
向 websocket 会话发送 TextMessage。我无法使整个方法同步,因为多个线程可以为不同的 websocketSession 和消息调用它。我也不能将会话放在同步块中(试过但没有用)
虽然,我这样解决了我的问题
synchronized(session.getId()){
//sending message;
}
而且我不再遇到那个问题。但是在同步块中使用字符串似乎不是好的做法。 那么我还有什么其他解决方案?发送异步消息的最佳方式是什么?
PS:连接建立后我已经使用了ConcurrentWebSocketSessionDecorator
,我正在使用更新的websocket。没有帮助。
session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);
注意 我将我的 websocet 会话保存在地图中,其中键是 session.getId,值是会话本身。
与其他一些 websocket 实现不同,Spring websocket 引用在每条消息上似乎并不相等。我通过他们的 ID 将会话保存在地图中,并且在每条消息上我检查传递的 websocket 与我已经放在我的地图上的 websocket 是否相等,它是错误的。
通过在我的 WebsocketSession
后面添加 volatile
关键字,在我坚持我的会话的地方,我解决了这个问题。我很高兴知道这是否也是一种不好的做法。但我的想法是,当从多个线程写入 websocket 会话时,这些线程会丢失 websocket 的状态,因为它尚未更新,这就是抛出此异常的原因。
通过添加 volatile,我们确保 websocket 状态在另一个线程使用它之前已经更新,因此写入 websocket 的工作会按预期同步。
我创建了一个名为 SessionData
的 class,它包含 websocketSession 和我需要的有关会话的所有其他数据。
public class SessionData {
private volatile WebSocketSession websocketSession;
//...other
// getters and setters ...
}
并且我使用 SessionData 作为会话 ID 是键的映射的值
然后当从 SessionData 获取 websocketSession 并从不同的线程写入它时,volatile 帮助我更新了 websocketSession。
更新 (2020)
这里的一个关键注意事项是每次要向会话发送消息时都应使用 sessionData.getWebsocketSession.sendMessage(...)
。你不应该直接使用会话,这意味着像 this 这样的代码是 不好的做法:
WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);
您永远不会知道这两行代码之间对 websocket 会话应用了哪些更改(在您的情况下可能超过 2 行)。
像这样的代码更好:
sessionData.getWebSocketSession().sendMessage(...);
也不要直接发布到在 Spring websocket MessageHandler
中传递给您的会话中。否则您可能会再次遇到该错误。
这就是为什么在连接打开时将 WebSocketSession
中的 sessionId
映射到 SessionData
的良好做法。您可以使用此存储库获取 volatile session
使用会话 ID 而不是直接使用会话。
ConcurrentWebSocketSessionDecorator 在多线程中就像一个魅力,它是为它而设计的。 您的地图实现可能有问题。
示例代码:
private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception
{
// Use the following will crash :
//sessions.put(session.getId(), new SessionData(session));
// Use ConcurrentWebSocketSessionDecorator is safe :
sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
sessions.remove(session.getId());
super.afterConnectionClosed(session, status);
}
public void send(WebSocketSession session, String msg) throws MessagingException {
try {
session.sendMessage (new TextMessage(msg));
} catch (IOException ex) {
throw new MessagingException(ex.getMessage());
}
}
轻松测试多线程中的行为:
public void sendMT(WebSocketSession session, String msg) throws MessagingException{
for (int i=0; i<3; i++){
new Thread(){
@Override
public void run(){
send (session, msg);
}.start();
}
}