如何在 airflow 1.8 上失败时重新启动 dag?

How to restart a dag when it fails on airflow 1.8?

与:

default_args = {
    ...
    'retries': 1,
    'retry_delay': timedelta (seconds = 1),
    ...
} 

多次重试失败的任务可以获取,但是任务失败,DAG再次启动时如何获取?

当然,自动...

您可以使用 on_failure_callback 功能来调用 python / bash 脚本来重新启动 DAG。 Airflow 目前没有提供在任务失败时自动重启 DAG 的功能。

您可以 运行 第二个 "Fail Check" DAG,它使用 provide_session 实用程序查询 task_id 与您想要的匹配并且状态为 failed 的任何任务实例。然后,您还需要有选择地清除下游任务,并将相关 DagRun 的状态设置为 running

from datetime import datetime, timedelta
from sqlalchemy import and_
import json

from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.utils.db import provide_session

from airflow.operators.python_operator import PythonOperator


default_args = {'start_date': datetime(2018, 6, 11),
                'retries': 2,
                'retry_delay': timedelta(minutes=2),
                'email': [],
                'email_on_failure': True}


dag = DAG('__RESET__FAILED_TASKS',
          default_args=default_args,
          schedule_interval='@daily',
          catchup=False
          )


@provide_session
def check_py(session=None, **kwargs):

    relevant_task_id = 'relevant_task_id'

    obj = (session
           .query(TaskInstance)
           .filter(and_(TaskInstance.task_id == relevant_task_id,
                        TaskInstance.state == 'failed'))
           .all())

    if obj is None:
        raise KeyError('No failed Task Instances of {} exist.'.format(relevant_task_id))
    else:
        # Clear the relevant tasks.
        (session
         .query(TaskInstance)
         .filter(and_(TaskInstance.task_id == relevant_task_id,
                      TaskInstance.state == 'failed'))
         .delete())

        # Clear downstream tasks and set relevant DAG state to RUNNING
        for _ in obj:
            _ = json.loads(_.val)

            # OPTIONAL: Clear downstream tasks in the specified Dag Run.
            for task in _['downstream_tasks']:
                (session
                 .query(TaskInstance)
                 .filter(and_(TaskInstance.task_id == task,
                              TaskInstance.dag_id == _['dag_id'],
                              TaskInstance.execution_date == datetime.strptime(_['ts'],
                                                                                "%Y-%m-%dT%H:%M:%S")))
                 .delete())

            # Set the Dag Run state to "running"
            dag_run = (session
                       .query(DagRun)
                       .filter(and_(DagRun.dag_id == _['dag_id'],
                                    DagRun.execution_date == datetime.strptime(_['ts'],
                                                                               "%Y-%m-%dT%H:%M:%S")))
                       .first())

            dag_run.set_state('running')

with dag:

    run_check = PythonOperator(task_id='run_check',
                               python_callable=check_py,
                               provide_context=True)

    run_check

Airflow 中对此的规范解决方案是创建一个 subdagoperator,将所有其他任务包装在 dag 中,然后对其应用重试。