有没有办法在 Spring 中保留在 Web 套接字断开连接时收到的最后一条消息?
Is there a way to persist the last message received on a web socket disconnection in Spring?
我使用 STOMP 和 sock js 在浏览器客户端和 Spring 引导后端之间建立了网络套接字连接。每一秒,一个有效载荷从客户端发送到服务器,其中包含需要持久保存到 Postgres 数据库的数据。可能有数千个客户端同时连接,所以我不想每秒为每个客户端更新数据库。因此,为了减少 CPU 负载,我想监听 Web 套接字 StompCommand.DISCONNECT
事件何时发生,然后保留从客户端收到的最后一条消息。
这可能吗,或者有其他方法可以解决这个问题吗?
在这种情况下 - 这个问题真的很自以为是 - 有很多可能的实现方式。
其中一个实现可以执行以下操作:
当您从已连接的客户端接收到消息时 - 维护当前客户端标识符到最后数据的映射(为了便于理解,在内存中就足够了)。
每次您在 @MessageMapping
中收到一条新消息时 class - 更新地图中的条目,以便它始终包含最后一条消息。
映射的值将是最后一条消息,键可以是 Principal、SessionId 字符串 - 任何您认为有用的。
@Component
public class LastMessageHolder {
private Map<Principal, MyData> lastDataPerPrincipal;
public void updateLastData(Principal principal, MyData data) {
lastDataPerPrincipal.put(principal, data);
}
public MyData getLastDataForPrincipalAndClear(Principal principal) {
return lastDataPerPrincipal.remove(principal);
}
}
消息接收者将通过stomp通道获取消息并更新最后一个消息持有者
@Component
public class MyMessageReceiver {
@Autowired
private LastMessageHolder lastMessageHolder;
@MessageMapping(...)
public void onDataReceived(Principal principal, MyData data) {
// this gets called every second per client
lastMessageHolder.updateLastData(principal, data);
}
}
并且当您在通道拦截器中侦听断开连接消息时 - 从正在断开连接的主体中删除数据并将其存储在数据库中:
@Component
public class DbStoreChannelInterceptor implements ChannelInterceptor {
@Autowired
private LastMessageHolder lastMessageHolder;
@Autowired // something that will store your stuff in the db
private DbDao dbDao;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message,
StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// populate a principal here, from headers, authentication token,
whatever
Principal principal = ...
accessor.setUser(principal);
}
if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
Principal principal = accessor.getUser();
MyData data = lastMessageHolder.getDataForPrincipalAndClear(principal);
dbDao.storeDataInDbForPrincipal(principal, data);
}
}
}
这是一个基本的想法。
从那以后你可以更进一步,而不是存储来自通道拦截器的数据(在这种情况下,实际的 INSERT
将为每个客户端完成)你可能想把它扔进一些- 内存或分布式队列 - 任何最适合你的,这样消费者将读取一批数据对象并一次存储它们,这样它将大大减少你的 RDBMS 的负载。
另外,我只是提一下,你应该考虑这样一种情况,客户端一直在发送数据,但是服务器由于某种原因宕机了,而客户端仍然有兴趣继续发送数据。这更多是在分布式系统的架构领域,所以它超出了问题的范围。
我使用 STOMP 和 sock js 在浏览器客户端和 Spring 引导后端之间建立了网络套接字连接。每一秒,一个有效载荷从客户端发送到服务器,其中包含需要持久保存到 Postgres 数据库的数据。可能有数千个客户端同时连接,所以我不想每秒为每个客户端更新数据库。因此,为了减少 CPU 负载,我想监听 Web 套接字 StompCommand.DISCONNECT
事件何时发生,然后保留从客户端收到的最后一条消息。
这可能吗,或者有其他方法可以解决这个问题吗?
在这种情况下 - 这个问题真的很自以为是 - 有很多可能的实现方式。
其中一个实现可以执行以下操作:
当您从已连接的客户端接收到消息时 - 维护当前客户端标识符到最后数据的映射(为了便于理解,在内存中就足够了)。
每次您在 @MessageMapping
中收到一条新消息时 class - 更新地图中的条目,以便它始终包含最后一条消息。
映射的值将是最后一条消息,键可以是 Principal、SessionId 字符串 - 任何您认为有用的。
@Component
public class LastMessageHolder {
private Map<Principal, MyData> lastDataPerPrincipal;
public void updateLastData(Principal principal, MyData data) {
lastDataPerPrincipal.put(principal, data);
}
public MyData getLastDataForPrincipalAndClear(Principal principal) {
return lastDataPerPrincipal.remove(principal);
}
}
消息接收者将通过stomp通道获取消息并更新最后一个消息持有者
@Component
public class MyMessageReceiver {
@Autowired
private LastMessageHolder lastMessageHolder;
@MessageMapping(...)
public void onDataReceived(Principal principal, MyData data) {
// this gets called every second per client
lastMessageHolder.updateLastData(principal, data);
}
}
并且当您在通道拦截器中侦听断开连接消息时 - 从正在断开连接的主体中删除数据并将其存储在数据库中:
@Component
public class DbStoreChannelInterceptor implements ChannelInterceptor {
@Autowired
private LastMessageHolder lastMessageHolder;
@Autowired // something that will store your stuff in the db
private DbDao dbDao;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message,
StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// populate a principal here, from headers, authentication token,
whatever
Principal principal = ...
accessor.setUser(principal);
}
if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
Principal principal = accessor.getUser();
MyData data = lastMessageHolder.getDataForPrincipalAndClear(principal);
dbDao.storeDataInDbForPrincipal(principal, data);
}
}
}
这是一个基本的想法。
从那以后你可以更进一步,而不是存储来自通道拦截器的数据(在这种情况下,实际的 INSERT
将为每个客户端完成)你可能想把它扔进一些- 内存或分布式队列 - 任何最适合你的,这样消费者将读取一批数据对象并一次存储它们,这样它将大大减少你的 RDBMS 的负载。
另外,我只是提一下,你应该考虑这样一种情况,客户端一直在发送数据,但是服务器由于某种原因宕机了,而客户端仍然有兴趣继续发送数据。这更多是在分布式系统的架构领域,所以它超出了问题的范围。