Celery+RabbitMQ 上任务进度未更新最新状态
Task progress is not updated latest status on Celery+RabbitMQ
我在 Celery + RabbitMQ 结果后端上使用 custom states 实现了长任务的进度反馈。
但是来电者无法像我预期的那样检索到最新的进度状态。在下面的代码中,result.info['step']
always return 0
,然后任务将以 "result=42".
完成
# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@app.task
def long_task():
for i in range(0, 10):
timer.sleep(10) # some work
self.update_state(state='PROGRESS', meta={'step': i})
return 42
# caller.py
from tasks import long_task
result = long_task.delay()
while not (result.successful() or result.failed()):
try:
result.get(timeout=1)
except celery.exceptions.TimeoutError:
if result.state == 'PROGRESS':
print("progress={}".format(result.info['step']))
print("result={}".format(result.get()))
Python 3.4.1/芹菜 3.1.17/RabbitMQ 3.4.4
我认为这是一个微妙的时间问题,结合 RabbitMQ result backend 将任务结果作为消息发送并且只能检索一次的事实。
预先简短回答:在您真正需要最终结果之前避免调用 result.get()
:
while not result.ready():
if result.state == "PROGRESS":
print("progress={}".format(result.info['step']))
time.sleep(1)
print("result={}".format(result.get()))
# +additional cleanup: see comments below
较长的答案是,这里实际上有两种方法(和一种 属性)与 AMQP 后端对话:
-
调用 AMQPBackend.wait_for()
,它会消耗任务队列中的所有结果,直到出现状态为 celery.states.READY_STATES
的结果。
AsyncResult.successful()
、AsyncResult.failed()
、AsyncResult.info
调用AMQPBackend.get_task_meta()
, which consumes all results from the queue for the task, then caches and returns the latest one. If no messages were retrieved, the backend returns a cached result or a PENDING
result. Note: the latest message is requeued by the backend, and if it's the final result,它将被AsyncResult
实例缓存1。
调用result.get()
将消耗所有状态更新,result.info
没有机会提供最新的进度报告;相反,它很可能是一个陈旧的缓存,其中一个对 AsyncResult.get_task_meta()
的调用在某个时候设法获取了。
因此,根据时间的不同,step
可能会在下一个最坏的情况下停留在 0,其中最坏的情况是 PROGRESS
状态永远不会到达调用者。
1因为最终结果在通过调用 get_task_meta()
获取时被重新排队和缓存,您需要手动清空队列,如在下方评论。
我在 Celery + RabbitMQ 结果后端上使用 custom states 实现了长任务的进度反馈。
但是来电者无法像我预期的那样检索到最新的进度状态。在下面的代码中,result.info['step']
always return 0
,然后任务将以 "result=42".
# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
@app.task
def long_task():
for i in range(0, 10):
timer.sleep(10) # some work
self.update_state(state='PROGRESS', meta={'step': i})
return 42
# caller.py
from tasks import long_task
result = long_task.delay()
while not (result.successful() or result.failed()):
try:
result.get(timeout=1)
except celery.exceptions.TimeoutError:
if result.state == 'PROGRESS':
print("progress={}".format(result.info['step']))
print("result={}".format(result.get()))
Python 3.4.1/芹菜 3.1.17/RabbitMQ 3.4.4
我认为这是一个微妙的时间问题,结合 RabbitMQ result backend 将任务结果作为消息发送并且只能检索一次的事实。
预先简短回答:在您真正需要最终结果之前避免调用 result.get()
:
while not result.ready():
if result.state == "PROGRESS":
print("progress={}".format(result.info['step']))
time.sleep(1)
print("result={}".format(result.get()))
# +additional cleanup: see comments below
较长的答案是,这里实际上有两种方法(和一种 属性)与 AMQP 后端对话:
-
调用
AMQPBackend.wait_for()
,它会消耗任务队列中的所有结果,直到出现状态为celery.states.READY_STATES
的结果。 AsyncResult.successful()
、AsyncResult.failed()
、AsyncResult.info
调用
AMQPBackend.get_task_meta()
, which consumes all results from the queue for the task, then caches and returns the latest one. If no messages were retrieved, the backend returns a cached result or aPENDING
result. Note: the latest message is requeued by the backend, and if it's the final result,它将被AsyncResult
实例缓存1。
调用result.get()
将消耗所有状态更新,result.info
没有机会提供最新的进度报告;相反,它很可能是一个陈旧的缓存,其中一个对 AsyncResult.get_task_meta()
的调用在某个时候设法获取了。
因此,根据时间的不同,step
可能会在下一个最坏的情况下停留在 0,其中最坏的情况是 PROGRESS
状态永远不会到达调用者。
1因为最终结果在通过调用 get_task_meta()
获取时被重新排队和缓存,您需要手动清空队列,如在下方评论。