为什么重启消费者后RabbitMQ队列中的消息丢失了?

Why do the messages in RabbitMQ queue get lost when consumer is restart?

我已经设置了一个 RabbitMQ 消费者,如下所示:

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor

import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.INFO,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueueConsumer(object):
    """The consumer class to manage connections to the AMQP server/queue"""

    def __init__(self, queue, logger, parameters, thread_id=0):
        self.channel = None
        self.connection = None
        self.queue_name = queue
        self.logger = logger
        self.consumer_id = 'Thread: %d' % (thread_id,)
        self.parameters = pika.ConnectionParameters(**parameters)

    def _on_queue_declared(self, frame):
        self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
        self.channel.basic_qos(prefetch_count=1)
        try:
            self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
            self.logger.info("{} Declared queue...".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))

    def _on_channel_open(self, channel):
        self.channel = channel
        try:
            self.channel.queue_declare(queue=self.queue_name,
                                       exclusive=False,
                                       durable=True,
                                       auto_delete=False,
                                       callback=self._on_queue_declared)
            self.logger.info("{} Opened Channel....".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))

    def _on_connected(self, connection):
        connection.channel(self._on_channel_open)

    def consume(self):
        try:
            self.connection = SelectConnection(self.parameters,
                                               self._on_connected)
            self.connection.ioloop.start()
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))
            self.connection.close()
            self.connection.ioloop.start()

    def decode(self, body):
        try:
            _body = body.decode('utf-8')
        except AttributeError:
            _body = body

        return _body

    def handle_delivery(self, channel, method, header, body):
        try:
            start_time = datetime.datetime.now()
            _logger.info("Received...")
            _logger.info("Content: %s" % body)
            req = json.loads(self.decode(body))

            # Do something
            sleep(randint(10, 100))

            time_taken = datetime.datetime.now() - start_time
            _logger.info("[{}] Time Taken: {}.{}".format(
                req.get("to_num"), time_taken.seconds, time_taken.microseconds))

        except Exception as err:
            _logger.exception(err)


if __name__ == "__main__":
    workers = 3
    pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
    try:
        pool = ThreadPoolExecutor(max_workers=workers)
        start = 1
        for thread_id in range(start, (workers + start)):
            pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)

    except Exception as err:
        _logger.exception(err)

我也有一个如下的队列发布者:

import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.DEBUG,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueuePublisherClient(object):

    def __init__(self, queue, request):
        self.queue = queue
        self.response = None
        self.channel = None
        self.request = request
        self.corrId = str(uuid.uuid4())
        self.callBackQueue = None
        self.connection = None
        parameters = pika.ConnectionParameters(host="0.0.0.0")
        self.connection = SelectConnection(
            parameters, self.on_response_connected
        )
        self.connection.ioloop.start()

    def on_response(self, ch, method, props, body):
        if self.corrId == props.correlation_id:
            self.response = body
            self.connection.close()
            self.connection.ioloop.start()

    def on_response_connected(self, connection):
        _logger.info("Connected...\t(%s)" % self.queue)
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_connected(self, connection):
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        # _logger.info("Channel Opened...\t(%s)" % self.queue)
        self.channel = channel
        self.channel.queue_declare(queue=self.queue,
                                   durable=True,
                                   exclusive=False,
                                   auto_delete=False,
                                   callback=self.on_queue_declared)

    def on_queue_declared(self, frame):
        self.channel.basic_publish(exchange="",
                                   routing_key=self.queue,
                                   properties=pika.BasicProperties(),
                                   body=str(self.request))
        self.connection.close()
        _logger.info("Message Published...\t(%s)" % self.queue)


if __name__ == "__main__":
    data = {
        'text': 'This is a sample text',
        'to_num': '+2547xxxxxxxx'
    }
    count = 10000

    for index in range(count):
        data['index'] = index
        QueuePublisherClient("test_queue", json.dumps(data))

当我向队列发布 10000 条消息并且消费者未启动时,通过 rabbitmqctl list_queues 我能够看到 test_queue 有 10000 条消息。当我启动消费者时,我 运行 rabbitmqctl list_queues 并且我看到队列有 0 条消息。但是,消费者仍在消费队列中的消息。问题是,当我在几秒钟后停止消费者然后重新启动它时,我无法恢复我的消息。我怎样才能避免这种情况?

这只是模拟了一个实际情况,消费者进程被monit重启,我遭受消息丢失

因为您已经将 prefetch_count 声明为 1 并且您的队列是持久的,所以当消费者启动时它只会一条一条地处理消息。要检查相同的内容,您可以在代码中放置 1 秒的睡眠,并在几秒钟后尝试重新启动消费者。您会看到,已处理的消息仅从队列中移除。如果您没有设置预取计数,那么只有一个消费者启动队列中的所有消息。希望对你有帮助。

首先,您应该使用最新版本的 Pika。

当您设置no_ack=True(Pika 1.0 中的auto_ack=True)时,RabbitMQ 认为消息在传递时已确认。这意味着当您停止它时,您的消费者在内存中(或在 TCP 堆栈中)的每条消息都将丢失,因为 RabbitMQ 认为它已确认。

您应该使用 no_ack=False(默认)并在您的工作完成后在 handle_delivery 中确认消息。请注意,如果您的工作需要很长时间,您应该在另一个线程中进行,以防止阻塞 Pika 的 I/O 循环。

请参阅以下文档:https://www.rabbitmq.com/confirms.html


注意: RabbitMQ 团队监控 rabbitmq-users mailing list 并且有时只在 Whosebug 上回答问题。