Airflow Python Branch Operator 在 1.10.15 中不工作
Airflow Python Branch Operator not working in 1.10.15
我的气流 test_dag 看起来像:
dag = DAG(
dag_id='test_dag',
default_args=some_args,
catchup=False,
schedule_interval='0 10 * * *'
)
dummy_step_one = PythonOperator(
task_id='dummy_step_one',
provide_context=True,
python_callable=dummy_step_one,
dag=dag,
retries=5
)
branch_test = BranchPythonOperator(
task_id='branch_test',
provide_context=True,
python_callable=branch_test,
dag=dag,
retries=3,
retry_delay=timedelta(seconds=5)
)
dummy_step_two = PythonOperator(
task_id='dummy_step_two',
provide_context=True,
python_callable=dummy_step_two,
dag=dag,
retries=5
)
dummy_step_three = PythonOperator(
task_id='dummy_step_three',
provide_context=True,
python_callable=dummy_step_three,
dag=dag,
retries=5
)
dummy_step_four = PythonOperator(
task_id='dummy_step_four',
provide_context=True,
python_callable=dummy_step_four,
dag=dag,
retries=5
)
dummy_step_one >> branch_test
branch_test >> dummy_step_two >> dummy_step_three >> dummy_step_four
branch_test >> dummy_step_four
我正在从另一个文件导入这些 python 个可调用文件,它们看起来像:
def dummy_step_one(**context: dict) -> str:
return print('hello world')
def dummy_step_two(**context: dict) -> str:
return print('hello world')
def dummy_step_three(**context: dict) -> str:
return print('hello world')
def dummy_step_four(**context: dict) -> str:
return print('hello world')
def branch_test(**context: dict) -> str:
return 'dummy_step_four'
所以理论上,这应该运行step_one,然后运行branch_test,然后跳过step_two和step_three导致branch_test 返回 dummy_step_four
,最后 运行 step_four。这个图表视图看起来正确:
但是,当这个 运行 秒时,step_four 被跳过,如上面的图表视图和下面的树视图所示。为什么??忽略第一个成功的 运行:
这是 branch_test 步骤中日志中打印的内容:
重要说明: 我使用的是 Apache Airflow 1.10.12,这是 运行ning 成功的,但我们最近升级到 1.10.15 以准备升级到 2.0——所以我面临的问题可能是相关的,但我找不到任何在线文档详细说明 1.10 中 python 分支运算符的错误。 15.
我认为您的代码在管道逻辑中存在错误。
BranchPythonOperator
预计returntask_id之后。
在你的情况下你有:
def branch_test(**context: dict) -> str:
return 'dummy_step_four'
这意味着它将始终遵循 dummy_step_four
并始终跳过 dummy_step_two
,但是您还设置了:
dummy_step_two >> dummy_step_three >> dummy_step_four
这意味着 dummy_step_four
个上游任务是:
dummy_step_three
状态 SKIPPED
branch_test
状态 SUCCESS
任务的默认 trigger rule 是 all_success
这意味着对于 dummy_step_four
条件不满足,因为它的父项之一被跳过,因此 dummy_step_four
也会被跳过。
要解决此问题,您需要更改任务 dummy_step_four
中的触发规则。
对于 Airflow< 2.2.0
使用:
trigger_rule="none_failed_or_skipped"
(是的,这是正确的触发规则,名称具有误导性,这就是为什么在 PR 中重命名的原因。)
对于 Airflow >= 2.2.0
使用:
trigger_rule="none_failed_min_one_success"
所以运营商代码需要是:
dummy_step_four = PythonOperator(
task_id='dummy_step_four',
provide_context=True,
python_callable=dummy_step_four,
dag=dag,
retries=5,
trigger_rule="none_failed_min_one_success",
)
示例:
注意:对于您的 senario,all_done
触发规则也将起作用(如果您愿意)。我没有在示例中使用它,因为 official docs 也针对这个确切的场景使用 none_failed_min_one_success
。
我的气流 test_dag 看起来像:
dag = DAG(
dag_id='test_dag',
default_args=some_args,
catchup=False,
schedule_interval='0 10 * * *'
)
dummy_step_one = PythonOperator(
task_id='dummy_step_one',
provide_context=True,
python_callable=dummy_step_one,
dag=dag,
retries=5
)
branch_test = BranchPythonOperator(
task_id='branch_test',
provide_context=True,
python_callable=branch_test,
dag=dag,
retries=3,
retry_delay=timedelta(seconds=5)
)
dummy_step_two = PythonOperator(
task_id='dummy_step_two',
provide_context=True,
python_callable=dummy_step_two,
dag=dag,
retries=5
)
dummy_step_three = PythonOperator(
task_id='dummy_step_three',
provide_context=True,
python_callable=dummy_step_three,
dag=dag,
retries=5
)
dummy_step_four = PythonOperator(
task_id='dummy_step_four',
provide_context=True,
python_callable=dummy_step_four,
dag=dag,
retries=5
)
dummy_step_one >> branch_test
branch_test >> dummy_step_two >> dummy_step_three >> dummy_step_four
branch_test >> dummy_step_four
我正在从另一个文件导入这些 python 个可调用文件,它们看起来像:
def dummy_step_one(**context: dict) -> str:
return print('hello world')
def dummy_step_two(**context: dict) -> str:
return print('hello world')
def dummy_step_three(**context: dict) -> str:
return print('hello world')
def dummy_step_four(**context: dict) -> str:
return print('hello world')
def branch_test(**context: dict) -> str:
return 'dummy_step_four'
所以理论上,这应该运行step_one,然后运行branch_test,然后跳过step_two和step_three导致branch_test 返回 dummy_step_four
,最后 运行 step_four。这个图表视图看起来正确:
但是,当这个 运行 秒时,step_four 被跳过,如上面的图表视图和下面的树视图所示。为什么??忽略第一个成功的 运行:
这是 branch_test 步骤中日志中打印的内容:
重要说明: 我使用的是 Apache Airflow 1.10.12,这是 运行ning 成功的,但我们最近升级到 1.10.15 以准备升级到 2.0——所以我面临的问题可能是相关的,但我找不到任何在线文档详细说明 1.10 中 python 分支运算符的错误。 15.
我认为您的代码在管道逻辑中存在错误。
BranchPythonOperator
预计returntask_id之后。
在你的情况下你有:
def branch_test(**context: dict) -> str:
return 'dummy_step_four'
这意味着它将始终遵循 dummy_step_four
并始终跳过 dummy_step_two
,但是您还设置了:
dummy_step_two >> dummy_step_three >> dummy_step_four
这意味着 dummy_step_four
个上游任务是:
dummy_step_three
状态 SKIPPED
branch_test
状态 SUCCESS
任务的默认 trigger rule 是 all_success
这意味着对于 dummy_step_four
条件不满足,因为它的父项之一被跳过,因此 dummy_step_four
也会被跳过。
要解决此问题,您需要更改任务 dummy_step_four
中的触发规则。
对于 Airflow< 2.2.0
使用:
trigger_rule="none_failed_or_skipped"
(是的,这是正确的触发规则,名称具有误导性,这就是为什么在 PR 中重命名的原因。)
对于 Airflow >= 2.2.0
使用:
trigger_rule="none_failed_min_one_success"
所以运营商代码需要是:
dummy_step_four = PythonOperator(
task_id='dummy_step_four',
provide_context=True,
python_callable=dummy_step_four,
dag=dag,
retries=5,
trigger_rule="none_failed_min_one_success",
)
示例:
注意:对于您的 senario,all_done
触发规则也将起作用(如果您愿意)。我没有在示例中使用它,因为 official docs 也针对这个确切的场景使用 none_failed_min_one_success
。