RabbitMQ:Ack/Nack 关闭并重新打开的通道上的消息

RabbitMQ: Ack/Nack a message on a channel that is closed and reopened

我从 RabbitMq 服务器收到此错误

Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 80"

发生这种情况是因为连接在消费者任务期间丢失,最后,当消息为 acked/nacked 时,我收到此错误,因为我无法在与收到的渠道不同的渠道上确认消息它来自.

这里是 RabbitMq 连接的代码

async connect({ prefetch = 1, queueName }) {
    this.queueName = queueName;
    console.log(`[AMQP][${this.queueName}] | connecting`);
    return queue
        .connect(this.config.rabbitmq.connstring)
        .then(conn => {
            conn.once('error', err => {
                this.channel = null;
                if (err.message !== 'Connection closing') {
                    console.error(
                        `[AMQP][${this.queueName}] (evt:error) | ${err.message}`,
                    );
                }
            });

            conn.once('close', () => {
                this.channel = null;
                console.error(
                    `[AMQP][${this.queueName}] (evt:close) | reconnecting`,
                );
                this.connect({ prefetch, queueName: this.queueName });
            });
            return conn.createChannel();
        })
        .then(ch => {
            console.log(`[AMQP-channel][${this.queueName}] created`);
            ch.on('error', err => {
                console.error(
                    `[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,
                );
            });
            ch.on('close', () => {
                console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
            });
            this.channel = ch;
            return this.channel;
        })
        .then(ch => {
            return this.channel.prefetch(prefetch);
        })
        .then(ch => {
            return this.channel.assertQueue(this.queueName);
        })
        .then(async ch => {
            while (this.buffer.length > 0) {
                const request = this.buffer.pop();
                await request();
            }
            return this.channel;
        })
        .catch(error => {
            console.error(error);
            console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
            return this._delay(1000).then(() =>
                this.connect({ prefetch, queueName: this.queueName }),
            );
        });
}

async ack(msg) {
    try {
        if (this.channel) {
            console.log(`[AMQP][${this.queueName}] ack`);
            await this.channel.ack(msg);
        } else {
            console.log(`[AMQP][${this.queueName}] ack (buffer)`);
            this.buffer.push(() => {
                this.ack(msg);
            });
        }
    } catch (e) {
        console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
    }
}

如您所见,建立连接后创建了一个通道,在我遇到连接问题后,通道设置为 NULL,1 秒后连接重试,重新创建一个新通道。

为了管理离线时期,我使用了一个缓冲区,该缓冲区收集在通道为 NULL 时发送的所有确认消息,并且在重新建立连接后我卸载缓冲区。

所以基本上我必须找到一种方法在连接丢失或通道因天气原因关闭后发送 ACK。

感谢您的帮助

如果连接由于某种原因被丢弃或中断,则无法发送 ACK,因为连接发生在套接字级别,一旦关闭,就无法使用相同的套接字重新创建它。

当连接断开时,消息保持非 ACK 状态,因此另一个侦听器可以处理它,或者断开连接的侦听器再次连接时将再次处理它。

在我看来,您试图解决的问题不是由 RabbitMQ 给出的,而是由基础的套接字实现给出的。

您可以通过避免管理消息缓冲区并利用 RabbitMQ 的特性来解决此问题,RabbitMQ 将在您的侦听器再次连接时立即重新呈现最后一条未处理的消息。

一旦频道关闭(无论是什么原因),您将无法确认消息。代理会自动将相同的消息重新传递给另一个消费者。

这在 RabbitMQ message confirmation 部分有详细记录。

When Consumers Fail or Lose Connection: Automatic Requeueing

When manual acknowledgements are used, any delivery (message) that was not acked is automatically requeued when the channel (or connection) on which the delivery happened is closed. This includes TCP connection loss by clients, consumer application (process) failures, and channel-level protocol exceptions (covered below).

...

Due to this behavior, consumers must be prepared to handle redeliveries and otherwise be implemented with idempotence in mind. Redeliveries will have a special boolean property, redeliver, set to true by RabbitMQ. For first time deliveries it will be set to false. Note that a consumer can receive a message that was previously delivered to another consumer.

正如文档所建议的那样,您需要通过实现消息幂等性设计模式来在消费者端处理此类问题。换句话说,您的架构应该准备好处理由于错误导致的消息重新传递。

或者,您可以禁用消息确认并获得“一次传递”类型的模式。这意味着如果出现错误,您将不得不处理消息丢失。

关于此事的进一步阅读: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

Kafka 引入新语义后的跟进: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/