Celery+RabbitMQ 上任务进度未更新最新状态

Task progress is not updated latest status on Celery+RabbitMQ

我在 Celery + RabbitMQ 结果后端上使用 custom states 实现了长任务的进度反馈。

但是来电者无法像我预期的那样检索到最新的进度状态。在下面的代码中,result.info['step'] always return 0,然后任务将以 "result=42".

完成
# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

@app.task
def long_task():
  for i in range(0, 10):
    timer.sleep(10)  # some work
    self.update_state(state='PROGRESS', meta={'step': i})
  return 42


# caller.py
from tasks import long_task
result = long_task.delay()

while not (result.successful() or result.failed()):
  try:
    result.get(timeout=1)
  except celery.exceptions.TimeoutError:
    if result.state == 'PROGRESS':
      print("progress={}".format(result.info['step']))
print("result={}".format(result.get()))

Python 3.4.1/芹菜 3.1.17/RabbitMQ 3.4.4

我认为这是一个微妙的时间问题,结合 RabbitMQ result backend 将任务结果作为消息发送并且只能检索一次的事实。

预先简短回答:在您真正需要最终结果之前避免调用 result.get()

while not result.ready():
  if result.state == "PROGRESS":
    print("progress={}".format(result.info['step']))
  time.sleep(1)
print("result={}".format(result.get()))
# +additional cleanup: see comments below

较长的答案是,这里实际上有两种方法(和一种 属性)与 AMQP 后端对话:

  • AsyncResult.get()

    调用 AMQPBackend.wait_for(),它会消耗任务队列中的所有结果,直到出现状态为 celery.states.READY_STATES 的结果。

  • AsyncResult.successful()AsyncResult.failed()AsyncResult.info

    调用AMQPBackend.get_task_meta(), which consumes all results from the queue for the task, then caches and returns the latest one. If no messages were retrieved, the backend returns a cached result or a PENDING result. Note: the latest message is requeued by the backend, and if it's the final result,它将被AsyncResult实例缓存1

调用result.get()将消耗所有状态更新,result.info没有机会提供最新的进度报告;相反,它很可能是一个陈旧的缓存,其中一个对 AsyncResult.get_task_meta() 的调用在某个时候设法获取了。

因此,根据时间的不同,step 可能会在下一个最坏的情况下停留在 0,其中最坏的情况是 PROGRESS 状态永远不会到达调用者。

1因为最终结果在通过调用 get_task_meta() 获取时被重新排队和缓存,您需要手动清空队列,如在下方评论。