如何使用 Python 在 Airflow 中的另一个 DAG 成功时触发 DAG?
How to Trigger a DAG on the success of a another DAG in Airflow using Python?
我有一个 python DAG Parent Job
和一个 DAG Child Job
。 Child Job
中的任务应在成功完成每天 运行 的 Parent Job
任务时触发。如何添加外部作业触发器?
我的代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')
execute_notebook = PostgresOperator(
task_id='data_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM athena_rs.shipments limit 5",
dag=dag
)
我相信您正在寻找 SubDags operator、运行 一个更大的 Dag。
请注意,像下面的示例一样创建许多子标签很快就会变得混乱,因此我建议将每个子标签拆分到一个文件中,然后导入到一个主文件中。
SubDagOperator 简单易用你需要给一个 Id,一个 subdag(子)和一个 dag(父)
subdag_2 = SubDagOperator(
task_id="just_some_id",
subdag=child_subdag, <---- this must be a DAG
dag=parent_dag, <----- this must be a DAG
)
它看起来像这样:
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i in range(5):
DummyOperator(
task_id='%s-task-%s' % (child_dag_name, i + 1),
default_args=args,
dag=dag_subdag,
)
return dag_subdag
DAG_NAME = 'example_subdag_operator'
args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
dag = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval="@once",
tags=['example']
)
start = DummyOperator(
task_id='start-of-main-job',
dag=dag,
)
some_other_task = DummyOperator(
task_id='some-other-task',
dag=dag,
)
end = DummyOperator(
task_id='end-of-main-job',
dag=dag,
)
subdag = SubDagOperator(
task_id='run-this-dag-after-previous-steps',
subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
dag=dag,
)
start >> some_other_task >> end >> subdag
答案已经在 中。下面是演示代码:
Parent 天:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')
leave_work = DummyOperator(
task_id='leave_work',
dag=dag,
)
cook_dinner = DummyOperator(
task_id='cook_dinner',
dag=dag,
)
leave_work >> cook_dinner
Child 天:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')
# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
start_date=datetime(2020, 4, 29),
execution_delta=timedelta(hours=1),
timeout=3600,
)
have_dinner = DummyOperator(
task_id='have_dinner',
dag=dag,
)
play_with_food = DummyOperator(
task_id='play_with_food',
dag=dag,
)
wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
图片:
狗
Parent_dag
Child_dag
作为 by @pankaj,我特此添加一段描述 反应式触发 使用 TriggerDagRunOperator
(相对于 poll-基于触发 of ExternalTaskSensor
)
from typing import List
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..
# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
task_id='my_trigger_task',
trigger_rule=TriggerRule.ALL_SUCCESS,
external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task
请注意,该片段仅供参考;尚未经过测试
注意事项/参考资料
- Get all Airflow Leaf Nodes/Tasks
我有一个 python DAG Parent Job
和一个 DAG Child Job
。 Child Job
中的任务应在成功完成每天 运行 的 Parent Job
任务时触发。如何添加外部作业触发器?
我的代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')
execute_notebook = PostgresOperator(
task_id='data_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM athena_rs.shipments limit 5",
dag=dag
)
我相信您正在寻找 SubDags operator、运行 一个更大的 Dag。 请注意,像下面的示例一样创建许多子标签很快就会变得混乱,因此我建议将每个子标签拆分到一个文件中,然后导入到一个主文件中。
SubDagOperator 简单易用你需要给一个 Id,一个 subdag(子)和一个 dag(父)
subdag_2 = SubDagOperator(
task_id="just_some_id",
subdag=child_subdag, <---- this must be a DAG
dag=parent_dag, <----- this must be a DAG
)
它看起来像这样:
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i in range(5):
DummyOperator(
task_id='%s-task-%s' % (child_dag_name, i + 1),
default_args=args,
dag=dag_subdag,
)
return dag_subdag
DAG_NAME = 'example_subdag_operator'
args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
dag = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval="@once",
tags=['example']
)
start = DummyOperator(
task_id='start-of-main-job',
dag=dag,
)
some_other_task = DummyOperator(
task_id='some-other-task',
dag=dag,
)
end = DummyOperator(
task_id='end-of-main-job',
dag=dag,
)
subdag = SubDagOperator(
task_id='run-this-dag-after-previous-steps',
subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
dag=dag,
)
start >> some_other_task >> end >> subdag
答案已经在
Parent 天:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')
leave_work = DummyOperator(
task_id='leave_work',
dag=dag,
)
cook_dinner = DummyOperator(
task_id='cook_dinner',
dag=dag,
)
leave_work >> cook_dinner
Child 天:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')
# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
start_date=datetime(2020, 4, 29),
execution_delta=timedelta(hours=1),
timeout=3600,
)
have_dinner = DummyOperator(
task_id='have_dinner',
dag=dag,
)
play_with_food = DummyOperator(
task_id='play_with_food',
dag=dag,
)
wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
图片:
狗
Parent_dag
Child_dag
作为 TriggerDagRunOperator
(相对于 poll-基于触发 of ExternalTaskSensor
)
from typing import List
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..
# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
task_id='my_trigger_task',
trigger_rule=TriggerRule.ALL_SUCCESS,
external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task
请注意,该片段仅供参考;尚未经过测试
注意事项/参考资料
- Get all Airflow Leaf Nodes/Tasks