Airflow Python Branch Operator 在 1.10.15 中不工作

Airflow Python Branch Operator not working in 1.10.15

我的气流 test_dag 看起来像:

dag = DAG(
    dag_id='test_dag',
    default_args=some_args,
    catchup=False,
    schedule_interval='0 10 * * *'
)

dummy_step_one = PythonOperator(
    task_id='dummy_step_one',
    provide_context=True,
    python_callable=dummy_step_one,
    dag=dag,
    retries=5
)

branch_test = BranchPythonOperator(
    task_id='branch_test',
    provide_context=True,
    python_callable=branch_test,
    dag=dag,
    retries=3,
    retry_delay=timedelta(seconds=5)
)

dummy_step_two = PythonOperator(
    task_id='dummy_step_two',
    provide_context=True,
    python_callable=dummy_step_two,
    dag=dag,
    retries=5
)

dummy_step_three = PythonOperator(
    task_id='dummy_step_three',
    provide_context=True,
    python_callable=dummy_step_three,
    dag=dag,
    retries=5
)

dummy_step_four = PythonOperator(
    task_id='dummy_step_four',
    provide_context=True,
    python_callable=dummy_step_four,
    dag=dag,
    retries=5
)

dummy_step_one >> branch_test
branch_test >> dummy_step_two >> dummy_step_three >> dummy_step_four
branch_test >> dummy_step_four

我正在从另一个文件导入这些 python 个可调用文件,它们看起来像:

def dummy_step_one(**context: dict) -> str:

    return print('hello world')

def dummy_step_two(**context: dict) -> str:

    return print('hello world')

def dummy_step_three(**context: dict) -> str:

    return print('hello world')

def dummy_step_four(**context: dict) -> str:

    return print('hello world')

def branch_test(**context: dict) -> str:

    return 'dummy_step_four'

所以理论上,这应该运行step_one,然后运行branch_test,然后跳过step_two和step_three导致branch_test 返回 dummy_step_four,最后 运行 step_four。这个图表视图看起来正确:

但是,当这个 运行 秒时,step_four 被跳过,如上面的图表视图和下面的树视图所示。为什么??忽略第一个成功的 运行:

这是 branch_test 步骤中日志中打印的内容:

重要说明: 我使用的是 Apache Airflow 1.10.12,这是 运行ning 成功的,但我们最近升级到 1.10.15 以准备升级到 2.0——所以我面临的问题可能是相关的,但我找不到任何在线文档详细说明 1.10 中 python 分支运算符的错误。 15.

我认为您的代码在管道逻辑中存在错误。

BranchPythonOperator预计returntask_id之后。 在你的情况下你有:

def branch_test(**context: dict) -> str:

    return 'dummy_step_four'

这意味着它将始终遵循 dummy_step_four 并始终跳过 dummy_step_two,但是您还设置了:

dummy_step_two >> dummy_step_three >> dummy_step_four

这意味着 dummy_step_four 个上游任务是:

dummy_step_three 状态 SKIPPED

branch_test 状态 SUCCESS

任务的默认 trigger ruleall_success 这意味着对于 dummy_step_four 条件不满足,因为它的父项之一被跳过,因此 dummy_step_four 也会被跳过。

要解决此问题,您需要更改任务 dummy_step_four 中的触发规则。

对于 Airflow< 2.2.0 使用:

trigger_rule="none_failed_or_skipped"

(是的,这是正确的触发规则,名称具有误导性,这就是为什么在 PR 中重命名的原因。)

对于 Airflow >= 2.2.0 使用:

trigger_rule="none_failed_min_one_success"

所以运营商代码需要是:

dummy_step_four = PythonOperator(
    task_id='dummy_step_four',
    provide_context=True,
    python_callable=dummy_step_four,
    dag=dag,
    retries=5,
    trigger_rule="none_failed_min_one_success",
)

示例:

注意:对于您的 senario,all_done 触发规则也将起作用(如果您愿意)。我没有在示例中使用它,因为 official docs 也针对这个确切的场景使用 none_failed_min_one_success