如果特定任务失败,流程是否可以终止?
Can a flow be terminated if a specific task fails?
我的流程很长,如果 特定 任务失败,我想将其状态标记为 Failed
。
我阅读了文档 here,但是他们似乎没有指定这种情况。
我的猜测是我需要在 @task()
装饰器中以某种方式具体说明这一点,但我不知道如何。
非常感谢任何指导。
这可以通过将您的特定任务声明为流程的“参考任务”来实现。 Reference tasks 是最终确定每个流程最终状态的任务 运行。
例如,以下代码片段创建了一个流程,其中包含两个独立、不相关的任务,这些任务随机失败一半。然而,整体流 运行 状态将仅根据 task_one
的状态来确定,因为我们指定该任务作为唯一参考任务:
import random
from prefect import task, Flow
@task
def task_one():
if random.random() > 0.5:
raise ValueError("Random failure")
@task
def task_two():
if random.random() > 0.5:
raise ValueError("Random failure")
flow = Flow("Two Task Flow", tasks=[task_one, task_two])
flow.set_reference_tasks([task_one])
我的流程很长,如果 特定 任务失败,我想将其状态标记为 Failed
。
我阅读了文档 here,但是他们似乎没有指定这种情况。
我的猜测是我需要在 @task()
装饰器中以某种方式具体说明这一点,但我不知道如何。
非常感谢任何指导。
这可以通过将您的特定任务声明为流程的“参考任务”来实现。 Reference tasks 是最终确定每个流程最终状态的任务 运行。
例如,以下代码片段创建了一个流程,其中包含两个独立、不相关的任务,这些任务随机失败一半。然而,整体流 运行 状态将仅根据 task_one
的状态来确定,因为我们指定该任务作为唯一参考任务:
import random
from prefect import task, Flow
@task
def task_one():
if random.random() > 0.5:
raise ValueError("Random failure")
@task
def task_two():
if random.random() > 0.5:
raise ValueError("Random failure")
flow = Flow("Two Task Flow", tasks=[task_one, task_two])
flow.set_reference_tasks([task_one])