AMQP Rabbitmq Nodejs - 每次订阅者出现故障时都会创建额外的发布者

AMQP Rabbitmq Nodejs - Additional publisher created each time when subscriber goes down

目前我有以下情况:


publisher.js:

var queue_connection = new amqp.createConnection( { host: config.rabbitmq.host, port: config.rabbitmq.port }  );

queue_connection.on( 'ready', function () {

    var exchange = queue_connection.exchange( 'http_worker', { type: 'topic' } );
        exchange.on( 'open', function(){
            setInterval(function(){
                exchange.publish( 'AZ', 'This is test message' );
                logger.info( 'AZ message pushed' );
            }, 1000);
    })
});

subscriber.js:

var connection = amqp.createConnection({ host: config.rabbitmq.host, port: config.rabbitmq.port });

connection.on( 'ready', function () {

    connection.queue( 'AZ', function ( q ) {

        q.bind( 'http_worker', 'AZ' );

        q.subscribe(function ( message ) {
            console.log(unescape( message.data ))
        });
    });

});

上面的例子工作得很好。当我启动 publisher.js 和 subscriber.js 时,订阅者将毫无问题地接收消息。 但是如果我关闭订阅者,然后再次启动它(没有关闭和启动 publisher.js),订阅者将开始一次接收 3 条相同的消息。


已编辑:

基于:

The default for both queues and exchanges is to automatically delete them when there are no consumers (for queues) or queues (for exchanges) bound to them, which is happening in your situation.

添加 "autoDelete: false" 解决了一个问题。

队列和交换器的默认设置是在没有消费者(对于队列)或队列(对于交换器)绑定到它们时自动删除它们,这发生在您的情况下。

添加错误事件处理程序时可以看到:

queue_connection.on('error', console.log);

发生这种情况时,RabbitMQ 关闭连接,amqp 驱动程序重新连接,它重新发出 ready 事件,然后您重新创建交换。

但是,之前的交换实例(来自第一个连接)仍然是 运行 并继续发布消息(这显然是有效的,尽管我怀疑消息被丢弃了)。

因此,在第一次重新连接后,您有 两个 发布者发送消息。断开订阅者,收到错误,重新连接,然后启动第三个发布者。等等

您可能需要跟踪连接状态,因此请确保在重置连接时清理所有交换或队列实例。