Celery 链 - 如果任何任务失败,则执行 x,否则执行 y
Celery chain - if any tasks fail, do x, else y
我刚刚在我的 Django 项目中使用 Celery 链。我有以下功能:
def orchestrate_tasks_for_account(account_id):
# Get the account, set status to 'SYNC' until the chain is complete
account = Account.objects.get(id=account_id)
account.status = "SYNC"
account.save()
chain = task1.s(account_id) | task2.s() | task3.s()
chain()
# if any of the tasks in the chain failed, set account.status = 'ERROR'
# else set the account.status = 'OK'
链按预期工作,但我不确定如何从链中获取反馈并根据结果更新帐户
换句话说,如果链中的任何任务失败,我想将帐户状态设置为 'ERROR',否则我想将帐户状态设置为 'OK'
我对 Celery 文档中关于如何处理带有 if/else 的错误感到困惑,就像我在上面最后两行中评论的那样。
有人有这方面的经验吗?
好的 - 这是我的想法
我在此解决方案中利用了 waiting 库
from celery import chain
from waiting import wait
def orchestrate_tasks_for_account(account_id):
account = Account.objects.get(id=account_id)
account.status = "SYNC"
account.save()
job = chain(
task1.s(account_id),
task2.s(),
task3.s()
)
result = job.apply_async()
wait(
lambda: result.ready(), # when async job is completed...
timeout_seconds=1800, # wait 1800 seconds (30 minutes)
waiting_for="task orchestration to complete"
)
if result.successful():
account.status = 'OK'
else:
account.status = 'ERROR'
account.save()
我乐于接受改进建议!
我刚刚在我的 Django 项目中使用 Celery 链。我有以下功能:
def orchestrate_tasks_for_account(account_id):
# Get the account, set status to 'SYNC' until the chain is complete
account = Account.objects.get(id=account_id)
account.status = "SYNC"
account.save()
chain = task1.s(account_id) | task2.s() | task3.s()
chain()
# if any of the tasks in the chain failed, set account.status = 'ERROR'
# else set the account.status = 'OK'
链按预期工作,但我不确定如何从链中获取反馈并根据结果更新帐户
换句话说,如果链中的任何任务失败,我想将帐户状态设置为 'ERROR',否则我想将帐户状态设置为 'OK'
我对 Celery 文档中关于如何处理带有 if/else 的错误感到困惑,就像我在上面最后两行中评论的那样。
有人有这方面的经验吗?
好的 - 这是我的想法
我在此解决方案中利用了 waiting 库
from celery import chain
from waiting import wait
def orchestrate_tasks_for_account(account_id):
account = Account.objects.get(id=account_id)
account.status = "SYNC"
account.save()
job = chain(
task1.s(account_id),
task2.s(),
task3.s()
)
result = job.apply_async()
wait(
lambda: result.ready(), # when async job is completed...
timeout_seconds=1800, # wait 1800 seconds (30 minutes)
waiting_for="task orchestration to complete"
)
if result.successful():
account.status = 'OK'
else:
account.status = 'ERROR'
account.save()
我乐于接受改进建议!