消费者取消后 RabbitMQ 鼠兔异步消费者心跳问题
RabbitMQ pika async consumer heartbeat issue after consumer cancellation
使用 RabbitMQ 和 pika (python),我是 运行 一个向节点(异步消费者)提供任务的作业队列系统。定义任务的每条消息仅在该任务完成后才会被确认。
有时我需要在这些节点上执行更新,我创建了一个退出模式,在该模式下节点等待其任务完成,然后优雅地退出。然后我就可以进行维护工作了。
为了让节点在这种退出模式下不会从 RabbitMQ 获取更多消息,我让它在等待作业完成之前调用 basic_cancel 方法。
鼠兔文档中描述了此方法的效果:
This method cancels a consumer. This does not affect already
delivered messages, but it does mean the server will not send any more
messages for that consumer. The client may receive an arbitrary number
of messages in between sending the cancel method and receiving the
cancel-ok reply. It may also be sent from the server to the client in
the event of the consumer being unexpectedly cancelled (i.e. cancelled
for any reason other than the server receiving the corresponding
basic.cancel from the client). This allows clients to be notified of
the loss of consumers due to events such as queue deletion.
因此,如果您将 "already delivered messages" 视为已收到但不一定确认的消息,则退出模式允许等待的任务不应重新排队,即使运行它的消费者节点自行取消排队系统。
我的异步消费者 class 的停止功能代码(取自 pika 示例)与此类似:
def stop(self):
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
will be invoked by pika, which will then closing the channel and
connection. The IOLoop is started again because this method is invoked
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
exception stops the IOLoop which needs to be running for pika to
communicate with RabbitMQ. All of the commands issued prior to starting
the IOLoop will be buffered but not processed.
"""
LOGGER.info('Stopping')
self._closing = True
self.stop_consuming()
LOGGER.info('Waiting for all running jobs to complete')
for index, thread in enumerate(self.threads):
if thread.is_alive():
thread.join()
# also tried with a while loop that waits 10s as long as the
# thread is still alive
LOGGER.info('Thread {} has finished'.format(index))
# also tried moving the call to stop consuming up to this point
if self._connection!=None:
self._connection.ioloop.start()
LOGGER.info('Closing connection')
self.close_connection()
我的问题是,在消费者取消之后,异步消费者似乎不再发送心跳,即使我在等待任务(线程)完成的循环之后执行取消也是如此。
我已经阅读了有关 BlockingConnections 的 process_data_events 函数,但我找不到这样的函数。 SelectConnection class 的 ioloop 是否等同于异步消费者?
由于我处于退出模式的节点不再发送心跳,一旦达到最大心跳,它当前正在执行的任务将由 RabbitMQ 重新排队。我想保持这种心跳不变,因为当我不处于退出模式时它无论如何都不是问题(我这里的心跳大约是 100 秒,我的任务可能需要多达 2 个小时才能完成)。
查看RabbitMQ日志,确实是心跳的原因:
=ERROR REPORT==== 12-Apr-2017::19:24:23 ===
closing AMQP connection (.....) :
missed heartbeats from client, timeout: 100s
我能想到的唯一解决方法是在退出模式下确认与任务相对应的消息仍然 运行,并希望这些任务不会失败...
是否有任何通道或连接的方法可用于在等待时手动发送一些心跳?
问题可能是 time.sleep() 或 thread.join() 方法(来自 python 线程包)完全阻塞并且不允许其他线程执行他们需要的?我在其他应用程序中使用,但它们似乎并不如此。
由于这个问题只在退出模式下出现,我猜停止函数中有什么东西导致消费者停止发送心跳,但我也试过(没有成功)调用 stop_consuming 方法仅在 wait-on-运行-tasks 循环之后,我看不出这个问题的根源是什么。
非常感谢您的帮助!
原来 stop_consuming 函数以异步方式调用 basic_cancel 并在 channel.close() 函数上进行回调,导致我的应用程序停止其 RabbitMQ 交互和 RabbitMQ重新排队未确认的消息。实际上意识到,当线程试图稍后确认剩余任务时出现错误,因为通道现在设置为 None,因此不再有 ack 方法。
希望对大家有所帮助!
使用 RabbitMQ 和 pika (python),我是 运行 一个向节点(异步消费者)提供任务的作业队列系统。定义任务的每条消息仅在该任务完成后才会被确认。
有时我需要在这些节点上执行更新,我创建了一个退出模式,在该模式下节点等待其任务完成,然后优雅地退出。然后我就可以进行维护工作了。
为了让节点在这种退出模式下不会从 RabbitMQ 获取更多消息,我让它在等待作业完成之前调用 basic_cancel 方法。
鼠兔文档中描述了此方法的效果:
This method cancels a consumer. This does not affect already
delivered messages, but it does mean the server will not send any more
messages for that consumer. The client may receive an arbitrary number
of messages in between sending the cancel method and receiving the
cancel-ok reply. It may also be sent from the server to the client in
the event of the consumer being unexpectedly cancelled (i.e. cancelled
for any reason other than the server receiving the corresponding
basic.cancel from the client). This allows clients to be notified of
the loss of consumers due to events such as queue deletion.
因此,如果您将 "already delivered messages" 视为已收到但不一定确认的消息,则退出模式允许等待的任务不应重新排队,即使运行它的消费者节点自行取消排队系统。
我的异步消费者 class 的停止功能代码(取自 pika 示例)与此类似:
def stop(self):
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
will be invoked by pika, which will then closing the channel and
connection. The IOLoop is started again because this method is invoked
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
exception stops the IOLoop which needs to be running for pika to
communicate with RabbitMQ. All of the commands issued prior to starting
the IOLoop will be buffered but not processed.
"""
LOGGER.info('Stopping')
self._closing = True
self.stop_consuming()
LOGGER.info('Waiting for all running jobs to complete')
for index, thread in enumerate(self.threads):
if thread.is_alive():
thread.join()
# also tried with a while loop that waits 10s as long as the
# thread is still alive
LOGGER.info('Thread {} has finished'.format(index))
# also tried moving the call to stop consuming up to this point
if self._connection!=None:
self._connection.ioloop.start()
LOGGER.info('Closing connection')
self.close_connection()
我的问题是,在消费者取消之后,异步消费者似乎不再发送心跳,即使我在等待任务(线程)完成的循环之后执行取消也是如此。
我已经阅读了有关 BlockingConnections 的 process_data_events 函数,但我找不到这样的函数。 SelectConnection class 的 ioloop 是否等同于异步消费者?
由于我处于退出模式的节点不再发送心跳,一旦达到最大心跳,它当前正在执行的任务将由 RabbitMQ 重新排队。我想保持这种心跳不变,因为当我不处于退出模式时它无论如何都不是问题(我这里的心跳大约是 100 秒,我的任务可能需要多达 2 个小时才能完成)。
查看RabbitMQ日志,确实是心跳的原因:
=ERROR REPORT==== 12-Apr-2017::19:24:23 ===
closing AMQP connection (.....) :
missed heartbeats from client, timeout: 100s
我能想到的唯一解决方法是在退出模式下确认与任务相对应的消息仍然 运行,并希望这些任务不会失败...
是否有任何通道或连接的方法可用于在等待时手动发送一些心跳?
问题可能是 time.sleep() 或 thread.join() 方法(来自 python 线程包)完全阻塞并且不允许其他线程执行他们需要的?我在其他应用程序中使用,但它们似乎并不如此。
由于这个问题只在退出模式下出现,我猜停止函数中有什么东西导致消费者停止发送心跳,但我也试过(没有成功)调用 stop_consuming 方法仅在 wait-on-运行-tasks 循环之后,我看不出这个问题的根源是什么。
非常感谢您的帮助!
原来 stop_consuming 函数以异步方式调用 basic_cancel 并在 channel.close() 函数上进行回调,导致我的应用程序停止其 RabbitMQ 交互和 RabbitMQ重新排队未确认的消息。实际上意识到,当线程试图稍后确认剩余任务时出现错误,因为通道现在设置为 None,因此不再有 ack 方法。
希望对大家有所帮助!