如果下游任务在 Airflow 中失败(使用 Sub Dags),你如何重新 运行 上游任务

How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)

我有一个 airflow dag 可以提取数据并执行验证。如果验证失败,则需要重新运行提取。如果验证成功,则继续。

我听说有人说 sub dags 可以解决这个问题,但我看不到这方面的任何例子。我试过使用 sub dag,但遇到了与在一个 DAG 中尝试这样做相同的问题。

如果其中一个任务失败,如何让 Sub DAG 中的所有任务重新运行?

我有以下 DAG/sub dag 详细信息:

maindag.py

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': start_date,
  'retries': 3,
  'retry_delay': timedelta(minutes=5),
  'sla': timedelta(hours=sla_hours)
}

main_dag = DAG(
  dag_id,
  default_args=default_args,
  schedule_interval='30 14 * * *',
  max_active_runs=1,
  concurrency=1)

task1 = BashOperator(...)

task2 = SubDagOperator(
  task_id=sub_dag_task_id,
  subdag=sub_dag(dag_id, sub_dag_task_id, start_date, main_dag.schedule_interval),
  dag=main_dag)

task3 = BashOperator(...)

subdag.py

def sub_dag(parent_dag_name, task_id, start_date, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, task_id),
    schedule_interval=schedule_interval,
    start_date=start_date,
    )

  task1 = BashOperator(...)

  task2 = BashOperator(...)

  task3 = BashOperator(...)

  task1 >> task2 >> task3

  return dag

在 sub dag 中,如果任务 3 失败,我希望任务 1 再次 运行,即使它已成功。为什么这么难做??!

我通过在主 dag 中创建重试回调方法找到了解决方案:

(原文出处:https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 )

from airflow.models import DagBag

def callback_subdag_clear(context):
    """Clears a subdag's tasks on retry."""
    dag_id = "{}.{}".format(
        context['dag'].dag_id,
        context['ti'].task_id
)
    execution_date = context['execution_date']
    sdag = DagBag().get_dag(dag_id)
    sdag.clear(
        start_date=execution_date,
        end_date=execution_date,
        only_failed=False,
        only_running=False,
        confirm_prompt=False,
        include_subdags=False)

然后对于运行 subdagoperator 的任务,它具有:

on_retry_callback=callback_subdag_clear,

现在清除每个任务的任务实例历史记录,re-runs 子 dag 中的每个任务,直到主 dag 中的重试次数。

有一个更简单的选择。 Full snippet

而不是

dag_id = "{}.{}".format(
    context['dag'].dag_id,
    context['ti'].task_id
)
sdag = DagBag().get_dag(dag_id)

你可以做到

task = context['task']
sdag = task.subdag

为什么?

因为(很可能)您的任务与具有 subdag 属性的 SubDagOperator 相关。

我在使用 时遇到问题。当我试图在 sdag 变量上调用 clear 时,我会得到一个异常,因为它是 None.

我在填充 DagBag 时将问题深入到 Dags 的不正确解析,我无法弄清楚。相反,我通过查看上下文中传递的内容并注意到它具有对具有 subdag 属性的任务的引用找到了一种解决方法,只要它来自 SubDag 运算符