在 Celery 的链中动态更改任务成功回调和错误回调
Dynamically change task success-callback and error-callback within a chain in Celery
我有一个任务图,其中每个任务可能有 on_success
和 on_error
后续任务。成功路径是 gua运行teed 是非循环的(例如,如果所有任务 运行 都很好,那么没有 on_success
处理程序 link 到已经 运行 的任务)和 gua 运行希望完成一些最终任务(例如,没有 on_success
后续处理程序)。但是 on_error
个任务可能 link 到任何任务,创建新的流(它们本身是正非循环的和有限的)。示例图:
+ final_1
task_1
+ - $task_2
entrypoint
- + final_2
task_2
- $task_1
其中 $task_1
表示执行流程直接进入另一个 b运行ch 和 运行s task_1
。
该示例图可以描述如下:
graph = {
'entrypoint': 'entry_1',
'tasks': {
'entry_1': {
'callback': 'text_generator',
'on_success': 'task_1',
'on_error': 'task_2',
},
'task_1': {
'callback': 'text_modifier',
'on_success': 'final_1',
'on_error': 'task_2',
},
'task_2': {
'callback': 'text_modifier',
'on_success': 'final_2',
'on_error': 'task_1',
},
'final_1': {
'callback': 'echo_result',
'on_success': None,
'on_error': None,
},
'final_2': {
'callback': 'store_result',
'on_success': None,
'on_error': None,
},
}
}
此外,管道获取任意输入以传递所有任务
pipeline_input = {
'foo': 'bar'
}
从技术上讲,如果 task_1
和 task_2
总是失败,它可能会进入无限循环,但这是设计好的情况。
我目前的解决方案是 运行 单个芹菜任务并处理我的“任务”中的错误(然后只是 运行 同步,阻止芹菜任务)。
@celery.task
def task_runner():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
while task:
try:
# call current task
task_output = task.callback(task_input)
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
task_input = task_output
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
task_output = task_input
代码是我的一些简化版本,因此可能包含明显的错误,但它是为了传达基本思想。
基本上,在执行过程中的每个时间点,我都有一个简单的成功流程——entrypoint -> task_1 -> final_1
(没有分叉也没有循环)。一旦遇到错误,如果已定义,我就切换到不同的成功流程 — entrypoint -> task_1 (error!) -> task_2 -> final_2
.
我现在想要的是 运行 这些任务作为 Celery 任务,而不仅仅是单个 Celery 任务中的同步方法。但是我想不出一种方法来定义任何任务失败的新链式流程(即 try/except 块)。
我可以为 entrypoint -> task_1 -> final_1
构建 chain
个有效任务并执行 apply_async():
@celery.task
def task_chainer():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
success_chain = []
while task:
try:
success_chain.append(task.callback.s(task_input))
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
success_chain.apply_async()
但是 try/except 不起作用,因为它现在都是异步的,我必须使用错误回调。由于这些链在错误情况下保持循环势能,我无法预先定义所有链,只能应用一次。
所以,我的问题是:一旦某个任务失败,如何重新定义任务链?或者应该以其他方式完成?也许 chain.unchain_tasks() 在这里有用吗?
我制定了一个解决方案,在当前 运行 任务完成后(无论成功还是失败)只对单个任务进行排队,看起来类似于:
@celery.task
def task_chainer():
# Enqueueing entrypoint task
run_task.apply_async(kwargs={'graph': graph, 'task_id': graph.get('entrypoint'), 'task_input': pipeline_input}, ignore_result=True)
@celery.task
def run_task(graph, task_id: int, task_input):
current_task = graph.tasks[task_id]
followup_task_id = None
try:
task_callable = _get_callable(current_task.callable)
output = task_callable(task_input)
followup_task_id = graph.tasks[current_task.on_success] if current_task.on_success else None
except Exception as error:
logging.exception(error)
output = task_input
followup_task_id = graph.tasks[current_task.on_error] if current_task.on_error else None
if followup_task_id:
run_task.apply_async(graph, followup_task_id, output)
return
# If no followup defined, then we're done with our graph and may want to update the overall graph status as DONE
我有一个任务图,其中每个任务可能有 on_success
和 on_error
后续任务。成功路径是 gua运行teed 是非循环的(例如,如果所有任务 运行 都很好,那么没有 on_success
处理程序 link 到已经 运行 的任务)和 gua 运行希望完成一些最终任务(例如,没有 on_success
后续处理程序)。但是 on_error
个任务可能 link 到任何任务,创建新的流(它们本身是正非循环的和有限的)。示例图:
+ final_1
task_1
+ - $task_2
entrypoint
- + final_2
task_2
- $task_1
其中 $task_1
表示执行流程直接进入另一个 b运行ch 和 运行s task_1
。
该示例图可以描述如下:
graph = {
'entrypoint': 'entry_1',
'tasks': {
'entry_1': {
'callback': 'text_generator',
'on_success': 'task_1',
'on_error': 'task_2',
},
'task_1': {
'callback': 'text_modifier',
'on_success': 'final_1',
'on_error': 'task_2',
},
'task_2': {
'callback': 'text_modifier',
'on_success': 'final_2',
'on_error': 'task_1',
},
'final_1': {
'callback': 'echo_result',
'on_success': None,
'on_error': None,
},
'final_2': {
'callback': 'store_result',
'on_success': None,
'on_error': None,
},
}
}
此外,管道获取任意输入以传递所有任务
pipeline_input = {
'foo': 'bar'
}
从技术上讲,如果 task_1
和 task_2
总是失败,它可能会进入无限循环,但这是设计好的情况。
我目前的解决方案是 运行 单个芹菜任务并处理我的“任务”中的错误(然后只是 运行 同步,阻止芹菜任务)。
@celery.task
def task_runner():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
while task:
try:
# call current task
task_output = task.callback(task_input)
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
task_input = task_output
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
task_output = task_input
代码是我的一些简化版本,因此可能包含明显的错误,但它是为了传达基本思想。
基本上,在执行过程中的每个时间点,我都有一个简单的成功流程——entrypoint -> task_1 -> final_1
(没有分叉也没有循环)。一旦遇到错误,如果已定义,我就切换到不同的成功流程 — entrypoint -> task_1 (error!) -> task_2 -> final_2
.
我现在想要的是 运行 这些任务作为 Celery 任务,而不仅仅是单个 Celery 任务中的同步方法。但是我想不出一种方法来定义任何任务失败的新链式流程(即 try/except 块)。
我可以为 entrypoint -> task_1 -> final_1
构建 chain
个有效任务并执行 apply_async():
@celery.task
def task_chainer():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
success_chain = []
while task:
try:
success_chain.append(task.callback.s(task_input))
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
success_chain.apply_async()
但是 try/except 不起作用,因为它现在都是异步的,我必须使用错误回调。由于这些链在错误情况下保持循环势能,我无法预先定义所有链,只能应用一次。
所以,我的问题是:一旦某个任务失败,如何重新定义任务链?或者应该以其他方式完成?也许 chain.unchain_tasks() 在这里有用吗?
我制定了一个解决方案,在当前 运行 任务完成后(无论成功还是失败)只对单个任务进行排队,看起来类似于:
@celery.task
def task_chainer():
# Enqueueing entrypoint task
run_task.apply_async(kwargs={'graph': graph, 'task_id': graph.get('entrypoint'), 'task_input': pipeline_input}, ignore_result=True)
@celery.task
def run_task(graph, task_id: int, task_input):
current_task = graph.tasks[task_id]
followup_task_id = None
try:
task_callable = _get_callable(current_task.callable)
output = task_callable(task_input)
followup_task_id = graph.tasks[current_task.on_success] if current_task.on_success else None
except Exception as error:
logging.exception(error)
output = task_input
followup_task_id = graph.tasks[current_task.on_error] if current_task.on_error else None
if followup_task_id:
run_task.apply_async(graph, followup_task_id, output)
return
# If no followup defined, then we're done with our graph and may want to update the overall graph status as DONE