RabbitMQ Stomp over websocket:无法检索排队的消息
RabbitMQ Stomp over websocket : Unable to retrieve queued messages
我正在使用 RabbitMQ Stomp 的持久订阅(文档 here)。根据文档,当客户端使用相同的 ID 重新连接(订阅)时,他应该获得所有 queued up 消息。但是,我无法取回任何内容,即使消息已 queued 在服务器端。下面是我正在使用的代码:
RabbitMQ 版本:3.6.0
客户代码:
var sock;
var stomp;
var messageCount = 0;
var stompConnect = function() {
sock = new SockJS(options.url);
stomp = Stomp.over(sock);
stomp.connect({}, function(frame) {
debug('Connected: ', frame);
console.log(frame);
var id = stomp.subscribe('<url>' + options.source + "." + options.type + "." + options.id, function(d) {
console.log(messageCount);
messageCount = messageCount + 1;
}, {'auto-delete' : false, 'persistent' : true , 'id' : 'unique_id', 'ack' : 'client'});
}, function(err) {
console.log(err);
debug('error', err, err.stack);
setTimeout(stompConnect, 10);
});
};
服务器代码:
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(final MessageBrokerRegistry config) {
config.enableStompBrokerRelay("<endpoint>", "<endpoint>").setRelayHost(host)
.setSystemLogin(username).setSystemPasscode(password).setClientLogin(username)
.setClientPasscode(password);
}
@Override
public void registerStompEndpoints(final StompEndpointRegistry registry) {
registry.addEndpoint("<endpoint>").setAllowedOrigins("*").withSockJS();
}
}
我正在执行的步骤:
- 运行客户端脚本,发送订阅请求
- A queue 在服务器端创建(名称为 stomp-subscription-*),所有消息都被推送到 queue 并且客户端能够流式传输这些消息。
- 杀死脚本,这会导致断开连接。服务器日志显示客户端已断开连接,消息开始变得 queued。
- 运行 脚本再次使用相同的 ID。它以某种方式设法连接到服务器,但是,服务器没有返回任何消息。 queue 上的消息计数保持不变(此外,RabbitMQ 管理控制台不显示该 queue 的任何消费者)。
- 10 秒后,连接断开并在客户端日志中打印以下内容:
Whoops! Lost connection to < url >
- 服务器也显示相同的消息(即客户端断开连接)。如客户端代码所示,它会在 10 秒后尝试建立连接,然后再次重复相同的循环。
我尝试了以下方法:
- 已删除
'ack' : 'client'
header。这导致所有消息都从 queue 中排出,但是 none 到达客户端。我在完成 this SO 回答后添加了这个 header。
- 在递增 messageCount 之前,在函数中添加了
d.ack();
。这会导致服务器端出错,因为它会在 session 关闭(由于断开连接)后尝试确认消息。
此外,在某些情况下,当我重新连接 queued up 消息数少于 100 条时,我能够收到所有消息。但是,一旦超过100,就没有任何反应(不确定是否与问题有关)。
不知道问题出在服务器端还是客户端。有任何输入吗?
最后,我找到了(并修复)了这个问题。我们使用 nginx 作为代理,它的 proxy_buffering
设置为 on
(默认值),请查看文档 here.
它是这样写的:
When buffering is enabled, nginx receives a response from the proxied
server as soon as possible, saving it into the buffers set by the
proxy_buffer_size and proxy_buffers directives.
因此,消息被缓冲(延迟),导致断开连接。我们尝试绕过 nginx,它工作正常,然后我们禁用了代理缓冲,现在它似乎工作正常,即使使用 nginx 代理也是如此。
我正在使用 RabbitMQ Stomp 的持久订阅(文档 here)。根据文档,当客户端使用相同的 ID 重新连接(订阅)时,他应该获得所有 queued up 消息。但是,我无法取回任何内容,即使消息已 queued 在服务器端。下面是我正在使用的代码:
RabbitMQ 版本:3.6.0
客户代码:
var sock;
var stomp;
var messageCount = 0;
var stompConnect = function() {
sock = new SockJS(options.url);
stomp = Stomp.over(sock);
stomp.connect({}, function(frame) {
debug('Connected: ', frame);
console.log(frame);
var id = stomp.subscribe('<url>' + options.source + "." + options.type + "." + options.id, function(d) {
console.log(messageCount);
messageCount = messageCount + 1;
}, {'auto-delete' : false, 'persistent' : true , 'id' : 'unique_id', 'ack' : 'client'});
}, function(err) {
console.log(err);
debug('error', err, err.stack);
setTimeout(stompConnect, 10);
});
};
服务器代码:
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(final MessageBrokerRegistry config) {
config.enableStompBrokerRelay("<endpoint>", "<endpoint>").setRelayHost(host)
.setSystemLogin(username).setSystemPasscode(password).setClientLogin(username)
.setClientPasscode(password);
}
@Override
public void registerStompEndpoints(final StompEndpointRegistry registry) {
registry.addEndpoint("<endpoint>").setAllowedOrigins("*").withSockJS();
}
}
我正在执行的步骤:
- 运行客户端脚本,发送订阅请求
- A queue 在服务器端创建(名称为 stomp-subscription-*),所有消息都被推送到 queue 并且客户端能够流式传输这些消息。
- 杀死脚本,这会导致断开连接。服务器日志显示客户端已断开连接,消息开始变得 queued。
- 运行 脚本再次使用相同的 ID。它以某种方式设法连接到服务器,但是,服务器没有返回任何消息。 queue 上的消息计数保持不变(此外,RabbitMQ 管理控制台不显示该 queue 的任何消费者)。
- 10 秒后,连接断开并在客户端日志中打印以下内容:
Whoops! Lost connection to < url >
- 服务器也显示相同的消息(即客户端断开连接)。如客户端代码所示,它会在 10 秒后尝试建立连接,然后再次重复相同的循环。
我尝试了以下方法:
- 已删除
'ack' : 'client'
header。这导致所有消息都从 queue 中排出,但是 none 到达客户端。我在完成 this SO 回答后添加了这个 header。 - 在递增 messageCount 之前,在函数中添加了
d.ack();
。这会导致服务器端出错,因为它会在 session 关闭(由于断开连接)后尝试确认消息。
此外,在某些情况下,当我重新连接 queued up 消息数少于 100 条时,我能够收到所有消息。但是,一旦超过100,就没有任何反应(不确定是否与问题有关)。
不知道问题出在服务器端还是客户端。有任何输入吗?
最后,我找到了(并修复)了这个问题。我们使用 nginx 作为代理,它的 proxy_buffering
设置为 on
(默认值),请查看文档 here.
它是这样写的:
When buffering is enabled, nginx receives a response from the proxied server as soon as possible, saving it into the buffers set by the proxy_buffer_size and proxy_buffers directives.
因此,消息被缓冲(延迟),导致断开连接。我们尝试绕过 nginx,它工作正常,然后我们禁用了代理缓冲,现在它似乎工作正常,即使使用 nginx 代理也是如此。