如何在 airflow 1.8 上失败时重新启动 dag?
How to restart a dag when it fails on airflow 1.8?
与:
default_args = {
...
'retries': 1,
'retry_delay': timedelta (seconds = 1),
...
}
多次重试失败的任务可以获取,但是任务失败,DAG再次启动时如何获取?
当然,自动...
您可以使用 on_failure_callback
功能来调用 python / bash 脚本来重新启动 DAG。 Airflow 目前没有提供在任务失败时自动重启 DAG 的功能。
您可以 运行 第二个 "Fail Check" DAG,它使用 provide_session
实用程序查询 task_id
与您想要的匹配并且状态为 failed
的任何任务实例。然后,您还需要有选择地清除下游任务,并将相关 DagRun
的状态设置为 running
。
from datetime import datetime, timedelta
from sqlalchemy import and_
import json
from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.utils.db import provide_session
from airflow.operators.python_operator import PythonOperator
default_args = {'start_date': datetime(2018, 6, 11),
'retries': 2,
'retry_delay': timedelta(minutes=2),
'email': [],
'email_on_failure': True}
dag = DAG('__RESET__FAILED_TASKS',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
@provide_session
def check_py(session=None, **kwargs):
relevant_task_id = 'relevant_task_id'
obj = (session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == relevant_task_id,
TaskInstance.state == 'failed'))
.all())
if obj is None:
raise KeyError('No failed Task Instances of {} exist.'.format(relevant_task_id))
else:
# Clear the relevant tasks.
(session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == relevant_task_id,
TaskInstance.state == 'failed'))
.delete())
# Clear downstream tasks and set relevant DAG state to RUNNING
for _ in obj:
_ = json.loads(_.val)
# OPTIONAL: Clear downstream tasks in the specified Dag Run.
for task in _['downstream_tasks']:
(session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == task,
TaskInstance.dag_id == _['dag_id'],
TaskInstance.execution_date == datetime.strptime(_['ts'],
"%Y-%m-%dT%H:%M:%S")))
.delete())
# Set the Dag Run state to "running"
dag_run = (session
.query(DagRun)
.filter(and_(DagRun.dag_id == _['dag_id'],
DagRun.execution_date == datetime.strptime(_['ts'],
"%Y-%m-%dT%H:%M:%S")))
.first())
dag_run.set_state('running')
with dag:
run_check = PythonOperator(task_id='run_check',
python_callable=check_py,
provide_context=True)
run_check
Airflow 中对此的规范解决方案是创建一个 subdagoperator,将所有其他任务包装在 dag 中,然后对其应用重试。
与:
default_args = {
...
'retries': 1,
'retry_delay': timedelta (seconds = 1),
...
}
多次重试失败的任务可以获取,但是任务失败,DAG再次启动时如何获取?
当然,自动...
您可以使用 on_failure_callback
功能来调用 python / bash 脚本来重新启动 DAG。 Airflow 目前没有提供在任务失败时自动重启 DAG 的功能。
您可以 运行 第二个 "Fail Check" DAG,它使用 provide_session
实用程序查询 task_id
与您想要的匹配并且状态为 failed
的任何任务实例。然后,您还需要有选择地清除下游任务,并将相关 DagRun
的状态设置为 running
。
from datetime import datetime, timedelta
from sqlalchemy import and_
import json
from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.utils.db import provide_session
from airflow.operators.python_operator import PythonOperator
default_args = {'start_date': datetime(2018, 6, 11),
'retries': 2,
'retry_delay': timedelta(minutes=2),
'email': [],
'email_on_failure': True}
dag = DAG('__RESET__FAILED_TASKS',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
@provide_session
def check_py(session=None, **kwargs):
relevant_task_id = 'relevant_task_id'
obj = (session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == relevant_task_id,
TaskInstance.state == 'failed'))
.all())
if obj is None:
raise KeyError('No failed Task Instances of {} exist.'.format(relevant_task_id))
else:
# Clear the relevant tasks.
(session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == relevant_task_id,
TaskInstance.state == 'failed'))
.delete())
# Clear downstream tasks and set relevant DAG state to RUNNING
for _ in obj:
_ = json.loads(_.val)
# OPTIONAL: Clear downstream tasks in the specified Dag Run.
for task in _['downstream_tasks']:
(session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == task,
TaskInstance.dag_id == _['dag_id'],
TaskInstance.execution_date == datetime.strptime(_['ts'],
"%Y-%m-%dT%H:%M:%S")))
.delete())
# Set the Dag Run state to "running"
dag_run = (session
.query(DagRun)
.filter(and_(DagRun.dag_id == _['dag_id'],
DagRun.execution_date == datetime.strptime(_['ts'],
"%Y-%m-%dT%H:%M:%S")))
.first())
dag_run.set_state('running')
with dag:
run_check = PythonOperator(task_id='run_check',
python_callable=check_py,
provide_context=True)
run_check
Airflow 中对此的规范解决方案是创建一个 subdagoperator,将所有其他任务包装在 dag 中,然后对其应用重试。