如果下游任务在 Airflow 中失败(使用 Sub Dags),你如何重新 运行 上游任务
How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)
我有一个 airflow dag 可以提取数据并执行验证。如果验证失败,则需要重新运行提取。如果验证成功,则继续。
我听说有人说 sub dags 可以解决这个问题,但我看不到这方面的任何例子。我试过使用 sub dag,但遇到了与在一个 DAG 中尝试这样做相同的问题。
如果其中一个任务失败,如何让 Sub DAG 中的所有任务重新运行?
我有以下 DAG/sub dag 详细信息:
maindag.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=sla_hours)
}
main_dag = DAG(
dag_id,
default_args=default_args,
schedule_interval='30 14 * * *',
max_active_runs=1,
concurrency=1)
task1 = BashOperator(...)
task2 = SubDagOperator(
task_id=sub_dag_task_id,
subdag=sub_dag(dag_id, sub_dag_task_id, start_date, main_dag.schedule_interval),
dag=main_dag)
task3 = BashOperator(...)
subdag.py
def sub_dag(parent_dag_name, task_id, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, task_id),
schedule_interval=schedule_interval,
start_date=start_date,
)
task1 = BashOperator(...)
task2 = BashOperator(...)
task3 = BashOperator(...)
task1 >> task2 >> task3
return dag
在 sub dag 中,如果任务 3 失败,我希望任务 1 再次 运行,即使它已成功。为什么这么难做??!
我通过在主 dag 中创建重试回调方法找到了解决方案:
(原文出处:https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117
)
from airflow.models import DagBag
def callback_subdag_clear(context):
"""Clears a subdag's tasks on retry."""
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id
)
execution_date = context['execution_date']
sdag = DagBag().get_dag(dag_id)
sdag.clear(
start_date=execution_date,
end_date=execution_date,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=False)
然后对于运行 subdagoperator 的任务,它具有:
on_retry_callback=callback_subdag_clear,
现在清除每个任务的任务实例历史记录,re-runs 子 dag 中的每个任务,直到主 dag 中的重试次数。
有一个更简单的选择。 Full snippet
而不是
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id
)
sdag = DagBag().get_dag(dag_id)
你可以做到
task = context['task']
sdag = task.subdag
为什么?
因为(很可能)您的任务与具有 subdag 属性的 SubDagOperator 相关。
我在使用 时遇到问题。当我试图在 sdag 变量上调用 clear 时,我会得到一个异常,因为它是 None.
我在填充 DagBag 时将问题深入到 Dags 的不正确解析,我无法弄清楚。相反,我通过查看上下文中传递的内容并注意到它具有对具有 subdag 属性的任务的引用找到了一种解决方法,只要它来自 SubDag 运算符
我有一个 airflow dag 可以提取数据并执行验证。如果验证失败,则需要重新运行提取。如果验证成功,则继续。
我听说有人说 sub dags 可以解决这个问题,但我看不到这方面的任何例子。我试过使用 sub dag,但遇到了与在一个 DAG 中尝试这样做相同的问题。
如果其中一个任务失败,如何让 Sub DAG 中的所有任务重新运行?
我有以下 DAG/sub dag 详细信息:
maindag.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=sla_hours)
}
main_dag = DAG(
dag_id,
default_args=default_args,
schedule_interval='30 14 * * *',
max_active_runs=1,
concurrency=1)
task1 = BashOperator(...)
task2 = SubDagOperator(
task_id=sub_dag_task_id,
subdag=sub_dag(dag_id, sub_dag_task_id, start_date, main_dag.schedule_interval),
dag=main_dag)
task3 = BashOperator(...)
subdag.py
def sub_dag(parent_dag_name, task_id, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, task_id),
schedule_interval=schedule_interval,
start_date=start_date,
)
task1 = BashOperator(...)
task2 = BashOperator(...)
task3 = BashOperator(...)
task1 >> task2 >> task3
return dag
在 sub dag 中,如果任务 3 失败,我希望任务 1 再次 运行,即使它已成功。为什么这么难做??!
我通过在主 dag 中创建重试回调方法找到了解决方案:
(原文出处:https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 )
from airflow.models import DagBag
def callback_subdag_clear(context):
"""Clears a subdag's tasks on retry."""
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id
)
execution_date = context['execution_date']
sdag = DagBag().get_dag(dag_id)
sdag.clear(
start_date=execution_date,
end_date=execution_date,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=False)
然后对于运行 subdagoperator 的任务,它具有:
on_retry_callback=callback_subdag_clear,
现在清除每个任务的任务实例历史记录,re-runs 子 dag 中的每个任务,直到主 dag 中的重试次数。
有一个更简单的选择。 Full snippet
而不是
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id
)
sdag = DagBag().get_dag(dag_id)
你可以做到
task = context['task']
sdag = task.subdag
为什么?
因为(很可能)您的任务与具有 subdag 属性的 SubDagOperator 相关。
我在使用
我在填充 DagBag 时将问题深入到 Dags 的不正确解析,我无法弄清楚。相反,我通过查看上下文中传递的内容并注意到它具有对具有 subdag 属性的任务的引用找到了一种解决方法,只要它来自 SubDag 运算符