永无止境的消息循环:在 python rabbitmq 消费者中重新传递相同的消息
Never ending message loop: Same message redelivered in python rabbitmq consumer
我正在使用此处发布的示例消费者:
http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html
我使用 ExampleConsumer 的原因是当工作任务开始花费更长的时间(超过 10 分钟)时,我与 rabbitmq 的连接失败。在 运行 长任务完成并且进程失败后,连接被关闭。它以前处理 1000 条消息需要一分钟左右的时间。
ExampleConsumer 似乎重新连接正常,但是,在确认消息中,由于连接已断开,消息实际上并未得到确认。它似乎 return 通常来自下面的确认消息方法。然后它会尝试重新连接,然后重新发送刚刚完成的消息。
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
您可能需要 add a heartbeat to your message consumer 来保持连接。
如果 rabbitmq 认为消费者在消息处于 "unacknowledged" 模式(仍在处理中)时死亡,它将把消息放回队列中。心跳可能有助于保持连接有效,防止这种情况发生。
RabbitMQ 代理实施默认心跳超时,具体取决于 RabbitMQ 版本,约为 10 分钟或 1 分钟;从 RabbitMQ v3.5.5 开始,较新版本中的默认值较短。应用程序可以通过连接参数传递一个明确的更长的心跳超时偏好。 Pika 的 SelectConnection 没有后台线程,所以当工作任务耗时超过心跳超时时,SelectConnection 无法在 broker 期望的时间限制内为心跳提供服务,broker 会断开连接。您可以尝试通过不同的方法解决此问题:
- 通过 pika.connection.ConnectionParameters 设置更长的心跳超时首选项(可能是最简单的)。 ConnectionParameters.heartbeat_interval=0 应该完全禁用心跳(和心跳超时)。
- 运行 与任务处理逻辑不同的线程上的连接
- 切换到协作式多任务连接类型之一,例如 Pika 中基于 Tornado 或 Twisted 框架的适配器,或 Haigha 中基于 gevent 的适配器。此更改将要求任务处理逻辑对协作式多任务处理友好。
如果您使用的是 pika 异步消费者示例,则只需将此更改添加到 init 方法中:
self._url = 'amqp://{}:{}@{}:{}/%2F{}'.format(
self.USERNAME, self.PASSWORD, self.ADDRESS, self.PORT, self.QUERY)
with self.QUERY 一个可以被参数化以设置不同参数的字符串,例如 heartbeat 如下:
self.QUERY ='?heartbeat_interval=600'
connect 方法将处理心跳事务。
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_error,
stop_ioloop_on_close=False,
)
这是告诉 RabbitMQ 哪个心跳与您的消费者相关联的好方法。请注意,RabbitMQ 将强制它至少为 60 秒。因此你不能将它设置得更低。
有关这些连接参数的更多信息:
https://pika.readthedocs.io/en/latest/modules/parameters.html
我正在使用此处发布的示例消费者:
http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html
我使用 ExampleConsumer 的原因是当工作任务开始花费更长的时间(超过 10 分钟)时,我与 rabbitmq 的连接失败。在 运行 长任务完成并且进程失败后,连接被关闭。它以前处理 1000 条消息需要一分钟左右的时间。
ExampleConsumer 似乎重新连接正常,但是,在确认消息中,由于连接已断开,消息实际上并未得到确认。它似乎 return 通常来自下面的确认消息方法。然后它会尝试重新连接,然后重新发送刚刚完成的消息。
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
您可能需要 add a heartbeat to your message consumer 来保持连接。
如果 rabbitmq 认为消费者在消息处于 "unacknowledged" 模式(仍在处理中)时死亡,它将把消息放回队列中。心跳可能有助于保持连接有效,防止这种情况发生。
RabbitMQ 代理实施默认心跳超时,具体取决于 RabbitMQ 版本,约为 10 分钟或 1 分钟;从 RabbitMQ v3.5.5 开始,较新版本中的默认值较短。应用程序可以通过连接参数传递一个明确的更长的心跳超时偏好。 Pika 的 SelectConnection 没有后台线程,所以当工作任务耗时超过心跳超时时,SelectConnection 无法在 broker 期望的时间限制内为心跳提供服务,broker 会断开连接。您可以尝试通过不同的方法解决此问题:
- 通过 pika.connection.ConnectionParameters 设置更长的心跳超时首选项(可能是最简单的)。 ConnectionParameters.heartbeat_interval=0 应该完全禁用心跳(和心跳超时)。
- 运行 与任务处理逻辑不同的线程上的连接
- 切换到协作式多任务连接类型之一,例如 Pika 中基于 Tornado 或 Twisted 框架的适配器,或 Haigha 中基于 gevent 的适配器。此更改将要求任务处理逻辑对协作式多任务处理友好。
如果您使用的是 pika 异步消费者示例,则只需将此更改添加到 init 方法中:
self._url = 'amqp://{}:{}@{}:{}/%2F{}'.format(
self.USERNAME, self.PASSWORD, self.ADDRESS, self.PORT, self.QUERY)
with self.QUERY 一个可以被参数化以设置不同参数的字符串,例如 heartbeat 如下:
self.QUERY ='?heartbeat_interval=600'
connect 方法将处理心跳事务。
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_error,
stop_ioloop_on_close=False,
)
这是告诉 RabbitMQ 哪个心跳与您的消费者相关联的好方法。请注意,RabbitMQ 将强制它至少为 60 秒。因此你不能将它设置得更低。
有关这些连接参数的更多信息: https://pika.readthedocs.io/en/latest/modules/parameters.html