如何在 Airflow 中创建条件任务
How to create a conditional task in Airflow
我想在 Airflow 中创建条件任务,如下图所示。预期情况如下:
- 任务 1 执行
- 如果任务 1 成功,则执行任务 2a
- 否则如果任务 1 失败,则执行任务 2b
- 最后执行任务3
以上所有任务都是 SSHExecuteOperator。
我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理这种情况,但我不清楚如何实现它。您能描述一下解决方案吗?
所有运算符都有一个 trigger_rule 参数,它定义了触发生成的任务的规则。
触发规则可能性:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
这是解决您问题的方法:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
Airflow有个BranchPythonOperator可以更直接的表达分支依赖
docs描述其用途:
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.
...
If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.
代码示例
def dummy_test():
return 'branch_a'
A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)
branch_task >> A_task
branch_task >> B_task
编辑:
如果您安装的 Airflow 版本 >=1.10.3,您也可以 return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining。
我想在 Airflow 中创建条件任务,如下图所示。预期情况如下:
- 任务 1 执行
- 如果任务 1 成功,则执行任务 2a
- 否则如果任务 1 失败,则执行任务 2b
- 最后执行任务3
所有运算符都有一个 trigger_rule 参数,它定义了触发生成的任务的规则。
触发规则可能性:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
这是解决您问题的方法:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
Airflow有个BranchPythonOperator可以更直接的表达分支依赖
docs描述其用途:
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.
...
If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.
代码示例
def dummy_test():
return 'branch_a'
A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)
branch_task >> A_task
branch_task >> B_task
编辑:
如果您安装的 Airflow 版本 >=1.10.3,您也可以 return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining。