使用 spring-websocket 和 RabbitMQ 代理订阅已删除的队列(队列 NOT_FOUND)
Subscribing to a removed queue with spring-websocket and RabbitMQ broker (Queue NOT_FOUND)
我在 Tomcat8 上有一个 spring-websocket (4.1.6) 应用程序,它使用 STOMP RabbitMQ (3.4.4) 消息代理进行消息传递。当客户端 (Chrome 47) 启动应用程序时,它会订阅创建持久队列的端点。当此客户端从端点取消订阅时,队列将在 30 秒后由 RabbitMQ 清理,如自定义 RabbitMQ 策略中所定义。当我尝试重新连接到具有已清理队列的端点时,我在 RabbitMQ 日志中收到以下异常:"NOT_FOUND - no queue 'position-updates-user9zm_szz9' in vhost '/'\n"。我不想使用自动删除队列,因为我有一些重新连接逻辑以防 websocket 连接中断。
可以通过将以下代码添加到 spring-websocket-portfolio github 示例来重现此问题。
在容器div中index.html添加:
<button class="btn" onclick="appModel.subscribe()">SUBSCRIBE</button>
<button class="btn" onclick="appModel.unsubscribe()">UNSUBSCRIBE</button>
在portfolio.js中替换:
stompClient.subscribe("/user/queue/position-updates", function(message) {
与:
positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {
并添加以下内容:
self.unsubscribe = function() {
positionUpdates.unsubscribe();
}
self.subscribe = function() {
positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {
self.pushNotification("Position update " + message.body);
self.portfolio().updatePosition(JSON.parse(message.body));
});
}
现在您可以通过以下方式重现问题:
- 启动应用程序
- 点击退订
- 删除 RabbitMQ 控制台中的位置更新队列
- 点击订阅
通过 chrome 开发工具和 RabbitMQ 日志在 websocket 框架中找到错误消息。
reconnect logic in case the websocket connection dies.
和
no queue 'position-updates-user9zm_szz9' in vhost
完全不同的故事。
我建议您在删除队列的情况下实施 "re-subscribe" 逻辑。
实际上这就是 STOMP 的工作原理:它为订阅创建 auto-deleted
(生成)队列,是的,它在取消订阅时被删除。
在 RabbitMQ STOMP 适配器中查看更多信息Manual。
从另一端考虑订阅现有的 AMQP 队列:
To address existing queues created outside the STOMP adapter, destinations of the form /amq/queue/<name>
can be used.
问题是如果队列被 RabbitMQ 策略删除,Stomp 将不会重新创建队列。我通过在触发 SessionSubscribeEvent 时自己创建队列来解决这个问题。
public void onApplicationEvent(AbstractSubProtocolEvent event) {
if (event instanceof SessionSubscribeEvent) {
MultiValueMap nativeHeaders = (MultiValueMap)event.getMessage().getHeaders().get("nativeHeaders");
List destination = (List)nativeHeaders.get("destination");
String queueName = ((String)destination.get(0)).substring("/queue/".length());
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, true, false, false, null);
} catch (IOException e) {
e.printStackTrace();
}
}
}
我在 Tomcat8 上有一个 spring-websocket (4.1.6) 应用程序,它使用 STOMP RabbitMQ (3.4.4) 消息代理进行消息传递。当客户端 (Chrome 47) 启动应用程序时,它会订阅创建持久队列的端点。当此客户端从端点取消订阅时,队列将在 30 秒后由 RabbitMQ 清理,如自定义 RabbitMQ 策略中所定义。当我尝试重新连接到具有已清理队列的端点时,我在 RabbitMQ 日志中收到以下异常:"NOT_FOUND - no queue 'position-updates-user9zm_szz9' in vhost '/'\n"。我不想使用自动删除队列,因为我有一些重新连接逻辑以防 websocket 连接中断。
可以通过将以下代码添加到 spring-websocket-portfolio github 示例来重现此问题。
在容器div中index.html添加:
<button class="btn" onclick="appModel.subscribe()">SUBSCRIBE</button>
<button class="btn" onclick="appModel.unsubscribe()">UNSUBSCRIBE</button>
在portfolio.js中替换:
stompClient.subscribe("/user/queue/position-updates", function(message) {
与:
positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {
并添加以下内容:
self.unsubscribe = function() {
positionUpdates.unsubscribe();
}
self.subscribe = function() {
positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {
self.pushNotification("Position update " + message.body);
self.portfolio().updatePosition(JSON.parse(message.body));
});
}
现在您可以通过以下方式重现问题:
- 启动应用程序
- 点击退订
- 删除 RabbitMQ 控制台中的位置更新队列
- 点击订阅
通过 chrome 开发工具和 RabbitMQ 日志在 websocket 框架中找到错误消息。
reconnect logic in case the websocket connection dies.
和
no queue 'position-updates-user9zm_szz9' in vhost
完全不同的故事。
我建议您在删除队列的情况下实施 "re-subscribe" 逻辑。
实际上这就是 STOMP 的工作原理:它为订阅创建 auto-deleted
(生成)队列,是的,它在取消订阅时被删除。
在 RabbitMQ STOMP 适配器中查看更多信息Manual。
从另一端考虑订阅现有的 AMQP 队列:
To address existing queues created outside the STOMP adapter, destinations of the form
/amq/queue/<name>
can be used.
问题是如果队列被 RabbitMQ 策略删除,Stomp 将不会重新创建队列。我通过在触发 SessionSubscribeEvent 时自己创建队列来解决这个问题。
public void onApplicationEvent(AbstractSubProtocolEvent event) {
if (event instanceof SessionSubscribeEvent) {
MultiValueMap nativeHeaders = (MultiValueMap)event.getMessage().getHeaders().get("nativeHeaders");
List destination = (List)nativeHeaders.get("destination");
String queueName = ((String)destination.get(0)).substring("/queue/".length());
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, true, false, false, null);
} catch (IOException e) {
e.printStackTrace();
}
}
}