有没有办法在 Spring 中保留在 Web 套接字断开连接时收到的最后一条消息?

Is there a way to persist the last message received on a web socket disconnection in Spring?

我使用 STOMP 和 sock js 在浏览器客户端和 Spring 引导后端之间建立了网络套接字连接。每一秒,一个有效载荷从客户端发送到服务器,其中包含需要持久保存到 Postgres 数据库的数据。可能有数千个客户端同时连接,所以我不想每秒为每个客户端更新数据库。因此,为了减少 CPU 负载,我想监听 Web 套接字 StompCommand.DISCONNECT 事件何时发生,然后保留从客户端收到的最后一条消息。

这可能吗,或者有其他方法可以解决这个问题吗?

在这种情况下 - 这个问题真的很自以为是 - 有很多可能的实现方式。

其中一个实现可以执行以下操作:

当您从已连接的客户端接收到消息时 - 维护当前客户端标识符到最后数据的映射(为了便于理解,在内存中就足够了)。

每次您在 @MessageMapping 中收到一条新消息时 class - 更新地图中的条目,以便它始终包含最后一条消息。

映射的值将是最后一条消息,键可以是 Principal、SessionId 字符串 - 任何您认为有用的。

@Component 
public class LastMessageHolder {
   private Map<Principal, MyData> lastDataPerPrincipal;

   public void updateLastData(Principal principal, MyData data) {
       lastDataPerPrincipal.put(principal, data);
   } 

   public MyData getLastDataForPrincipalAndClear(Principal principal) {
       return lastDataPerPrincipal.remove(principal);
   } 
}

消息接收者将通过stomp通道获取消息并更新最后一个消息持有者

@Component
public class MyMessageReceiver {

   @Autowired
   private LastMessageHolder lastMessageHolder;

    @MessageMapping(...)
    public void onDataReceived(Principal principal, MyData data) {
       // this gets called every second per client
       lastMessageHolder.updateLastData(principal, data);
    }
}

并且当您在通道拦截器中侦听断开连接消息时 - 从正在断开连接的主体中删除数据并将其存储在数据库中:

@Component
public class DbStoreChannelInterceptor implements ChannelInterceptor {
   @Autowired
   private LastMessageHolder lastMessageHolder;
   @Autowired // something that will store your stuff in the db
   private DbDao             dbDao;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor =
                      MessageHeaderAccessor.getAccessor(message, 
                                                        StompHeaderAccessor.class);

        if (StompCommand.CONNECT.equals(accessor.getCommand()))  {
           // populate a principal here, from headers, authentication token, 
           whatever

           Principal principal = ...
           accessor.setUser(principal);
        }
        if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            Principal principal = accessor.getUser(); 
            MyData data = lastMessageHolder.getDataForPrincipalAndClear(principal);
            dbDao.storeDataInDbForPrincipal(principal, data);
        }
    }
    
}

这是一个基本的想法。

从那以后你可以更进一步,而不是存储来自通道拦截器的数据(在这种情况下,实际的 INSERT 将为每个客户端完成)你可能想把它扔进一些- 内存或分布式队列 - 任何最适合你的,这样消费者将读取一批数据对象并一次存储它们,这样它将大大减少你的 RDBMS 的负载。

另外,我只是提一下,你应该考虑这样一种情况,客户端一直在发送数据,但是服务器由于某种原因宕机了,而客户端仍然有兴趣继续发送数据。这更多是在分布式系统的架构领域,所以它超出了问题的范围。