气流触发规则任务依赖
Airflow Trigger Rule Task dependencies
在下面显示的 DAG 中,无论任务 b
和 c
是成功还是失败,我都想执行任务 d
,但是对于任务 e
如果任务 b
, c
& d
是成功的那么只有它应该被触发。 DAG Image
写在下面的代码但它不起作用:
with DAG(dag_id='test_dag', default_args=args, schedule_interval=None) as dag:
t_test = PythonOperator(task_id='test', python_callable=test)
t_a = PythonOperator(task_id='a', python_callable=a)
t_b = PythonOperator(task_id='b', python_callable=b)
t_c = PythonOperator(task_id='c', python_callable=c)
t_d = PythonOperator(task_id='d', python_callable=d, trigger_rule=TriggerRule.ALL_DONE)
t_e = PythonOperator(task_id='e', python_callable=e, trigger_rule='all_success')
t_test >> t_a >> [t_b, t_c] >> t_d >> t_e
您需要在单独的语句中对工作流树的分支进行编码。
# execute task d no matter whether tasks b & c succeeded or failed
t_test >> t_a >> [t_b, t_c] >> t_d
# execute task e if tasks b, c & d succeeded
t_e << [t_b, t_c, t_d]
在下面显示的 DAG 中,无论任务 b
和 c
是成功还是失败,我都想执行任务 d
,但是对于任务 e
如果任务 b
, c
& d
是成功的那么只有它应该被触发。 DAG Image
写在下面的代码但它不起作用:
with DAG(dag_id='test_dag', default_args=args, schedule_interval=None) as dag:
t_test = PythonOperator(task_id='test', python_callable=test)
t_a = PythonOperator(task_id='a', python_callable=a)
t_b = PythonOperator(task_id='b', python_callable=b)
t_c = PythonOperator(task_id='c', python_callable=c)
t_d = PythonOperator(task_id='d', python_callable=d, trigger_rule=TriggerRule.ALL_DONE)
t_e = PythonOperator(task_id='e', python_callable=e, trigger_rule='all_success')
t_test >> t_a >> [t_b, t_c] >> t_d >> t_e
您需要在单独的语句中对工作流树的分支进行编码。
# execute task d no matter whether tasks b & c succeeded or failed
t_test >> t_a >> [t_b, t_c] >> t_d
# execute task e if tasks b, c & d succeeded
t_e << [t_b, t_c, t_d]