客户端重新连接时 WebSocket 消息丢失
WebSocket messages are lost while client is reconnecting
我在前端使用 STOMP.js,在后端使用 ActiveMQ 向客户端发送推送通知。客户端先订阅一个主题,代码如下:
function stompConnect() {
console.log('STOMP: Attempting connection');
// recreate the stompClient to use a new WebSocket
var socket = new SockJS('/websocket');
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
stompClient.subscribe('/topic/table-updates', function(notification){
showNotification(JSON.parse(notification.body));
});
}, function (error) {
console.log('STOMP: ' + error);
setTimeout(stompConnect, 10000);
console.log('STOMP: Reconnecting in 10 seconds');
});
}
stompConnect();
有时底层websocket连接丢失,客户端需要重新连接并再次订阅主题(超时10秒)。这导致在客户端重新连接时,来自服务器的某些消息丢失。有什么办法可以防止这种情况吗?
我在后端使用 Spring WebSocket。这是配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Value("${stomp.port}")
private Integer stompPort;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry
.enableStompBrokerRelay("/topic/")
.setRelayPort(stompPort);
}
@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService brokerService() throws Exception {
final BrokerService broker = BrokerFactory.createBroker(
String.format("broker:(vm://localhost,stomp://localhost:%d)?persistent=false", stompPort));
broker.addShutdownHook(new SpringContextHook());
return broker;
}
}
当然可以。
您需要进行一些更改。
都在这里描述了:Using Queue Browsers to Implement Durable Topic Subscriptions
不过我会给你一个快速的运行下来。
消息传递结构的一些重要变化:
- 将您的 websocket 消费者订阅到“/queue/”而不是主题和设置队列浏览。
- 使用 Reliable Messaging
从您的制作人发送消息
这是一个基本示例:
- 用户打开网页并订阅/queue/important-stuff,队列浏览顺序为-1(仅限新消息)。
- 对于每条到达的消息,将其序列号存储在某处。而用户在消息中享受他们的重要数据。
- 用户的 websocket 连接断开。
- 检测到这个再重新连接,一定要参考Jeff的例子here,web sockets只能打开一次
- 重新订阅并在from-seq header中传入存储的序列号。
- Apollo 代理将发送丢失的消息。
请记住,经纪人可能会收到一些消息,您需要遵循页面上的建议,并且:
- 将带有过期时间的消息发送到队列,以便在达到过期时间后自动删除它们。
- 定期运行一个普通的消费者应用程序,它可以游标
队列和删除消息被视为不再需要。
我在前端使用 STOMP.js,在后端使用 ActiveMQ 向客户端发送推送通知。客户端先订阅一个主题,代码如下:
function stompConnect() {
console.log('STOMP: Attempting connection');
// recreate the stompClient to use a new WebSocket
var socket = new SockJS('/websocket');
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
stompClient.subscribe('/topic/table-updates', function(notification){
showNotification(JSON.parse(notification.body));
});
}, function (error) {
console.log('STOMP: ' + error);
setTimeout(stompConnect, 10000);
console.log('STOMP: Reconnecting in 10 seconds');
});
}
stompConnect();
有时底层websocket连接丢失,客户端需要重新连接并再次订阅主题(超时10秒)。这导致在客户端重新连接时,来自服务器的某些消息丢失。有什么办法可以防止这种情况吗?
我在后端使用 Spring WebSocket。这是配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Value("${stomp.port}")
private Integer stompPort;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry
.enableStompBrokerRelay("/topic/")
.setRelayPort(stompPort);
}
@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService brokerService() throws Exception {
final BrokerService broker = BrokerFactory.createBroker(
String.format("broker:(vm://localhost,stomp://localhost:%d)?persistent=false", stompPort));
broker.addShutdownHook(new SpringContextHook());
return broker;
}
}
当然可以。
您需要进行一些更改。
都在这里描述了:Using Queue Browsers to Implement Durable Topic Subscriptions
不过我会给你一个快速的运行下来。
消息传递结构的一些重要变化:
- 将您的 websocket 消费者订阅到“/queue/”而不是主题和设置队列浏览。
- 使用 Reliable Messaging 从您的制作人发送消息
这是一个基本示例:
- 用户打开网页并订阅/queue/important-stuff,队列浏览顺序为-1(仅限新消息)。
- 对于每条到达的消息,将其序列号存储在某处。而用户在消息中享受他们的重要数据。
- 用户的 websocket 连接断开。
- 检测到这个再重新连接,一定要参考Jeff的例子here,web sockets只能打开一次
- 重新订阅并在from-seq header中传入存储的序列号。
- Apollo 代理将发送丢失的消息。
请记住,经纪人可能会收到一些消息,您需要遵循页面上的建议,并且:
- 将带有过期时间的消息发送到队列,以便在达到过期时间后自动删除它们。
- 定期运行一个普通的消费者应用程序,它可以游标 队列和删除消息被视为不再需要。