永无止境的消息循环:在 python rabbitmq 消费者中重新传递相同的消息

Never ending message loop: Same message redelivered in python rabbitmq consumer

我正在使用此处发布的示例消费者:

http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html

我使用 ExampleConsumer 的原因是当工作任务开始花费更长的时间(超过 10 分钟)时,我与 rabbitmq 的连接失败。在 运行 长任务完成并且进程失败后,连接被关闭。它以前处理 1000 条消息需要一分钟左右的时间。

ExampleConsumer 似乎重新连接正常,但是,在确认消息中,由于连接已断开,消息实际上并未得到确认。它似乎 return 通常来自下面的确认消息方法。然后它会尝试重新连接,然后重新发送刚刚完成的消息。

  def acknowledge_message(self, delivery_tag):
        """Acknowledge the message delivery from RabbitMQ by sending a
        Basic.Ack RPC method for the delivery tag.

        :param int delivery_tag: The delivery tag from the Basic.Deliver frame

        """
        LOGGER.info('Acknowledging message %s', delivery_tag)
        self._channel.basic_ack(delivery_tag)

您可能需要 add a heartbeat to your message consumer 来保持连接。

如果 rabbitmq 认为消费者在消息处于 "unacknowledged" 模式(仍在处理中)时死亡,它将把消息放回队列中。心跳可能有助于保持连接有效,防止这种情况发生。

RabbitMQ 代理实施默认心跳超时,具体取决于 RabbitMQ 版本,约为 10 分钟或 1 分钟;从 RabbitMQ v3.5.5 开始,较新版本中的默认值较短。应用程序可以通过连接参数传递一个明确的更长的心跳超时偏好。 Pika 的 SelectConnection 没有后台线程,所以当工作任务耗时超过心跳超时时,SelectConnection 无法在 broker 期望的时间限制内为心跳提供服务,broker 会断开连接。您可以尝试通过不同的方法解决此问题:

  1. 通过 pika.connection.ConnectionParameters 设置更长的心跳超时首选项(可能是最简单的)。 ConnectionParameters.heartbeat_interval=0 应该完全禁用心跳(和心跳超时)。
  2. 运行 与任务处理逻辑不同的线程上的连接
  3. 切换到协作式多任务连接类型之一,例如 Pika 中基于 Tornado 或 Twisted 框架的适配器,或 Haigha 中基于 gevent 的适配器。此更改将要求任务处理逻辑对协作式多任务处理友好。

如果您使用的是 pika 异步消费者示例,则只需将此更改添加到 init 方法中:

    self._url = 'amqp://{}:{}@{}:{}/%2F{}'.format(
                     self.USERNAME, self.PASSWORD, self.ADDRESS, self.PORT, self.QUERY) 

with self.QUERY 一个可以被参数化以设置不同参数的字符串,例如 heartbeat 如下:

self.QUERY ='?heartbeat_interval=600'

connect 方法将处理心跳事务。

def connect(self):
    """This method connects to RabbitMQ, returning the connection handle.
    When the connection is established, the on_connection_open method
    will be invoked by pika.

    :rtype: pika.SelectConnection

    """
    LOGGER.info('Connecting to %s', self._url)
    return pika.SelectConnection(parameters=pika.URLParameters(self._url),
                                 on_open_callback=self.on_connection_open,
                                 on_open_error_callback=self.on_connection_error,
                                 stop_ioloop_on_close=False,
                                 )

这是告诉 RabbitMQ 哪个心跳与您的消费者相关联的好方法。请注意,RabbitMQ 将强制它至少为 60 秒。因此你不能将它设置得更低。

有关这些连接参数的更多信息: https://pika.readthedocs.io/en/latest/modules/parameters.html