运行即使主要任务失败也有和弦回调

Run a chord callback even if the main tasks fail

是否可以运行一个chord回调,即使主任务失败了?

我创建了一个和弦,我添加了一堆任务并注册了一个回调。我的问题是,如果其中一项任务失败,则不会触发回调,但我希望以任何一种方式触发回调。

我尝试用 si() 注册回调 (immutability)

callback = tasks.run_delete_rule.si([timestamp])
header = [tasks.run_update_rule.s(i, timestamp) for i in item_ids]
result = chord(header)(callback)

我也尝试将参数 ignore_result=True 添加到两个任务装饰器,但没有成功。

来自 github 问题 #1881 如果回调设置了 link_error 选项,它采用任务名称列表,那么当和弦的任务失败时 link_error 任务将要执行。

@task(name='super_task.good')
def good():
    return True

@task(name='super_task.raise_exception')
def raise_exception():
    raise ValueError('error')

@task(name='super_task.callback')
def callback(*args, **kwargs):
    logger.info('callback')
    logger.info(args)
    logger.info(kwargs)
    return 'finished'

@task(name='super_task.error_callback')
def error_callback(*args, **kwargs):
    logger.info('error_callback')
    logger.info(args)
    logger.info(kwargs)
    return 'error'

>>> c = chord(
        [raise_exception.s(), good.s(), raise_exception.s()], 
        callback.s().set(link_error=['super_task.error_callback'])
    )
>>> result = c()

这将执行和弦,在你的 celery 日志中,你会看到 raise_exception 任务失败,error_callback 的执行将在其参数中接收 task_id callback 个。

此时 result 的值将包含 callbackAsyncResult 实例,并且因为在和弦中,错误会传播到执行 result.get() 的回调中引发任务的异常,result.traceback 给你回溯。

如果要单曲回调,将和弦回调名称传给link_error

即可
callback.s().set(link_error='super_task.callback')

注意

另一个选项是设置 CELERY_CHORD_PROPAGATES = False,它将恢复到 celery 3.1 之前的行为并始终执行回调。

但这不是推荐的方法,因为您可以在 github 问题中找到 #1349

Celery 3.1 defines how chord errors are handled, the previous behavior was never documented and more of an accident since it was never the intention to work that way.

We couldn't change the behavior in a bugfix release so a setting had to be used instead, but it was never the intention that someone would deliberately disable the new behavior.

The new behavior is there to protect against this sort of issue happening, and the backward compatible setting may be removed. I suggest you find some other way to handle errors here (and I wouldn't mind a proposal if you can invent a nice api for it)

您只需更改 link_error 的调用方式即可。而不是字符串引用,传递带有你想要的参数的签名。

在上面的示例中,您可以按以下方式传递参数

c = chord(
    [raise_exception.s(), good.s(), raise_exception.s()], 
    callback.s().set(link_error=[error_callback.s(<arguments_here>)])
)

请记住,第一个参数将是 task_id,其他参数将是签名中定义的参数。