客户端重新连接时 WebSocket 消息丢失

WebSocket messages are lost while client is reconnecting

我在前端使用 STOMP.js,在后端使用 ActiveMQ 向客户端发送推送通知。客户端先订阅一个主题,代码如下:

function stompConnect() {
    console.log('STOMP: Attempting connection');
    // recreate the stompClient to use a new WebSocket
    var socket = new SockJS('/websocket');
    var stompClient = Stomp.over(socket);

    stompClient.connect({}, function(frame) {
        stompClient.subscribe('/topic/table-updates', function(notification){
            showNotification(JSON.parse(notification.body));
        });
    }, function (error) {
        console.log('STOMP: ' + error);
        setTimeout(stompConnect, 10000);
        console.log('STOMP: Reconnecting in 10 seconds');
    });
}

stompConnect();

有时底层websocket连接丢失,客户端需要重新连接并再次订阅主题(超时10秒)。这导致在客户端重新连接时,来自服务器的某些消息丢失。有什么办法可以防止这种情况吗?

我在后端使用 Spring WebSocket。这是配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Value("${stomp.port}")
    private Integer stompPort;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry
            .enableStompBrokerRelay("/topic/")
            .setRelayPort(stompPort);
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public BrokerService brokerService() throws Exception {
        final BrokerService broker = BrokerFactory.createBroker(
            String.format("broker:(vm://localhost,stomp://localhost:%d)?persistent=false", stompPort));

        broker.addShutdownHook(new SpringContextHook());
        return broker;
    }
} 

当然可以。

您需要进行一些更改。

都在这里描述了:Using Queue Browsers to Implement Durable Topic Subscriptions

不过我会给你一个快速的运行下来。

消息传递结构的一些重要变化:

  • 将您的 websocket 消费者订阅到“/queue/”而不是主题和设置队列浏览。
  • 使用 Reliable Messaging
  • 从您的制作人发送消息

这是一个基本示例:

  1. 用户打开网页并订阅/queue/important-stuff,队列浏览顺序为-1(仅限新消息)。
  2. 对于每条到达的消息,将其序列号存储在某处。而用户在消息中享受他们的重要数据。
  3. 用户的 websocket 连接断开。
  4. 检测到这个再重新连接,一定要参考Jeff的例子here,web sockets只能打开一次
  5. 重新订阅并在from-seq header中传入存储的序列号。
  6. Apollo 代理将发送丢失的消息。

请记住,经纪人可能会收到一些消息,您需要遵循页面上的建议,并且:

  • 将带有过期时间的消息发送到队列,以便在达到过期时间后自动删除它们。
  • 定期运行一个普通的消费者应用程序,它可以游标 队列和删除消息被视为不再需要。