AMQP Rabbitmq Nodejs - 每次订阅者出现故障时都会创建额外的发布者
AMQP Rabbitmq Nodejs - Additional publisher created each time when subscriber goes down
目前我有以下情况:
publisher.js:
var queue_connection = new amqp.createConnection( { host: config.rabbitmq.host, port: config.rabbitmq.port } );
queue_connection.on( 'ready', function () {
var exchange = queue_connection.exchange( 'http_worker', { type: 'topic' } );
exchange.on( 'open', function(){
setInterval(function(){
exchange.publish( 'AZ', 'This is test message' );
logger.info( 'AZ message pushed' );
}, 1000);
})
});
subscriber.js:
var connection = amqp.createConnection({ host: config.rabbitmq.host, port: config.rabbitmq.port });
connection.on( 'ready', function () {
connection.queue( 'AZ', function ( q ) {
q.bind( 'http_worker', 'AZ' );
q.subscribe(function ( message ) {
console.log(unescape( message.data ))
});
});
});
上面的例子工作得很好。当我启动 publisher.js 和 subscriber.js 时,订阅者将毫无问题地接收消息。
但是如果我关闭订阅者,然后再次启动它(没有关闭和启动 publisher.js),订阅者将开始一次接收 3 条相同的消息。
- 我想这是因为 publisher.js 中的 3 个回调而发生的,但我不明白为什么每次订阅者关闭时都会调用这些回调
- 我怎样才能防止这种情况发生? rabbitmq/amqp 是否有任何选项可以强制发布者等待订阅者再次重新连接。
- 此外,是否有可能实现这一点而不会丢失队列中的消息,在订阅者停机时排队?
- 先启动订阅者再启动发布者(反之亦然)是否有区别。这是如何工作的,谁需要创建队列,订阅者还是发布者?
已编辑:
基于:
The default for both queues and exchanges is to automatically delete them when there are no consumers (for queues) or queues (for exchanges) bound to them, which is happening in your situation.
添加 "autoDelete: false" 解决了一个问题。
队列和交换器的默认设置是在没有消费者(对于队列)或队列(对于交换器)绑定到它们时自动删除它们,这发生在您的情况下。
添加错误事件处理程序时可以看到:
queue_connection.on('error', console.log);
发生这种情况时,RabbitMQ 关闭连接,amqp
驱动程序重新连接,它重新发出 ready
事件,然后您重新创建交换。
但是,之前的交换实例(来自第一个连接)仍然是 运行 并继续发布消息(这显然是有效的,尽管我怀疑消息被丢弃了)。
因此,在第一次重新连接后,您有 两个 发布者发送消息。断开订阅者,收到错误,重新连接,然后启动第三个发布者。等等
您可能需要跟踪连接状态,因此请确保在重置连接时清理所有交换或队列实例。
目前我有以下情况:
publisher.js:
var queue_connection = new amqp.createConnection( { host: config.rabbitmq.host, port: config.rabbitmq.port } );
queue_connection.on( 'ready', function () {
var exchange = queue_connection.exchange( 'http_worker', { type: 'topic' } );
exchange.on( 'open', function(){
setInterval(function(){
exchange.publish( 'AZ', 'This is test message' );
logger.info( 'AZ message pushed' );
}, 1000);
})
});
subscriber.js:
var connection = amqp.createConnection({ host: config.rabbitmq.host, port: config.rabbitmq.port });
connection.on( 'ready', function () {
connection.queue( 'AZ', function ( q ) {
q.bind( 'http_worker', 'AZ' );
q.subscribe(function ( message ) {
console.log(unescape( message.data ))
});
});
});
上面的例子工作得很好。当我启动 publisher.js 和 subscriber.js 时,订阅者将毫无问题地接收消息。 但是如果我关闭订阅者,然后再次启动它(没有关闭和启动 publisher.js),订阅者将开始一次接收 3 条相同的消息。
- 我想这是因为 publisher.js 中的 3 个回调而发生的,但我不明白为什么每次订阅者关闭时都会调用这些回调
- 我怎样才能防止这种情况发生? rabbitmq/amqp 是否有任何选项可以强制发布者等待订阅者再次重新连接。
- 此外,是否有可能实现这一点而不会丢失队列中的消息,在订阅者停机时排队?
- 先启动订阅者再启动发布者(反之亦然)是否有区别。这是如何工作的,谁需要创建队列,订阅者还是发布者?
已编辑:
基于:
The default for both queues and exchanges is to automatically delete them when there are no consumers (for queues) or queues (for exchanges) bound to them, which is happening in your situation.
添加 "autoDelete: false" 解决了一个问题。
队列和交换器的默认设置是在没有消费者(对于队列)或队列(对于交换器)绑定到它们时自动删除它们,这发生在您的情况下。
添加错误事件处理程序时可以看到:
queue_connection.on('error', console.log);
发生这种情况时,RabbitMQ 关闭连接,amqp
驱动程序重新连接,它重新发出 ready
事件,然后您重新创建交换。
但是,之前的交换实例(来自第一个连接)仍然是 运行 并继续发布消息(这显然是有效的,尽管我怀疑消息被丢弃了)。
因此,在第一次重新连接后,您有 两个 发布者发送消息。断开订阅者,收到错误,重新连接,然后启动第三个发布者。等等
您可能需要跟踪连接状态,因此请确保在重置连接时清理所有交换或队列实例。