在 Celery 的链中动态更改任务成功回调和错误回调

Dynamically change task success-callback and error-callback within a chain in Celery

我有一个任务图,其中每个任务可能有 on_successon_error 后续任务。成功路径是 gua运行teed 是非循环的(例如,如果所有任务 运行 都很好,那么没有 on_success 处理程序 link 到已经 运行 的任务)和 gua 运行希望完成一些最终任务(例如,没有 on_success 后续处理程序)。但是 on_error 个任务可能 link 到任何任务,创建新的流(它们本身是正非循环的和有限的)。示例图:

                 + final_1
           task_1
          +      - $task_2
entrypoint
          -      + final_2
           task_2
                 - $task_1

其中 $task_1 表示执行流程直接进入另一个 b运行ch 和 运行s task_1

该示例图可以描述如下:

graph = {
    'entrypoint': 'entry_1',
    'tasks': {
        'entry_1': {
            'callback': 'text_generator',
            'on_success': 'task_1',
            'on_error': 'task_2',
        },
        'task_1': {
            'callback': 'text_modifier',
            'on_success': 'final_1',
            'on_error': 'task_2',
        },
        'task_2': {
            'callback': 'text_modifier',
            'on_success': 'final_2',
            'on_error': 'task_1',
        },
        'final_1': {
            'callback': 'echo_result',
            'on_success': None,
            'on_error': None,
        },
        'final_2': {
            'callback': 'store_result',
            'on_success': None,
            'on_error': None,
        },
    }
}

此外,管道获取任意输入以传递所有任务

pipeline_input = {
    'foo': 'bar'
}

从技术上讲,如果 task_1task_2 总是失败,它可能会进入无限循环,但这是设计好的情况。

我目前的解决方案是 运行 单个芹菜任务并处理我的“任务”中的错误(然后只是 运行 同步,阻止芹菜任务)。

@celery.task
def task_runner():
    task = graph.tasks[graph.entrypoint]
    task_input = some_input
    task_output = None
    while task:
        try:
            # call current task
            task_output = task.callback(task_input)

            # define successor and define input as output of current task
            task = graph.tasks[task.on_success]
            task_input = task_output
        except Exception:
            # define error-successor and define input as output of current task
            task = graph.tasks[task.on_error]
            task_output = task_input

代码是我的一些简化版本,因此可能包含明显的错误,但它是为了传达基本思想。

基本上,在执行过程中的每个时间点,我都有一个简单的成功流程——entrypoint -> task_1 -> final_1(没有分叉也没有循环)。一旦遇到错误,如果已定义,我就切换到不同的成功流程 — entrypoint -> task_1 (error!) -> task_2 -> final_2.

我现在想要的是 运行 这些任务作为 Celery 任务,而不仅仅是单个 Celery 任务中的同步方法。但是我想不出一种方法来定义任何任务失败的新链式流程(即 try/except 块)。

我可以为 entrypoint -> task_1 -> final_1 构建 chain 个有效任务并执行 apply_async():

@celery.task
def task_chainer():
    task = graph.tasks[graph.entrypoint]
    task_input = some_input
    task_output = None
    success_chain = []
    while task:
        try:
            success_chain.append(task.callback.s(task_input))

            # define successor and define input as output of current task
            task = graph.tasks[task.on_success]
        except Exception:
            # define error-successor and define input as output of current task
            task = graph.tasks[task.on_error]
    success_chain.apply_async()

但是 try/except 不起作用,因为它现在都是异步的,我必须使用错误回调。由于这些链在错误情况下保持循环势能,我无法预先定义所有链,只能应用一次。

所以,我的问题是:一旦某个任务失败,如何重新定义任务链?或者应该以其他方式完成?也许 chain.unchain_tasks() 在这里有用吗?

我制定了一个解决方案,在当前 运行 任务完成后(无论成功还是失败)只对单个任务进行排队,看起来类似于:

@celery.task
def task_chainer():
    # Enqueueing entrypoint task
    run_task.apply_async(kwargs={'graph': graph, 'task_id': graph.get('entrypoint'), 'task_input': pipeline_input}, ignore_result=True)


@celery.task
def run_task(graph, task_id: int, task_input):
    current_task = graph.tasks[task_id]

    followup_task_id = None
    try:
        task_callable = _get_callable(current_task.callable)
        output = task_callable(task_input)

        followup_task_id = graph.tasks[current_task.on_success] if current_task.on_success else None
    except Exception as error:
        logging.exception(error)
        output = task_input

        followup_task_id = graph.tasks[current_task.on_error] if current_task.on_error else None

    if followup_task_id:
        run_task.apply_async(graph, followup_task_id, output)
        return

    # If no followup defined, then we're done with our graph and may want to update the overall graph status as DONE