Apache Airflow 从一个分支移动到另一个分支

Apache Airflow move from branch to another branch

我在 Apache Airflow 中将一个分支移动到另一个分支时遇到了一些问题 我有一个依赖于三个 Branch operators

的 DAG
all_empty_branch_task >> generate_round_task >> load_tasks
all_empty_branch_task >> resolving_branch_task
resolving_branch_task >> [
        export_final_annotation_task, annotation_branch_task, cleansing_branch_task]

我确认 resolving_branch_task(check-resolving-branch) python 函数 return 是 annotation_branch_task(check-annotation-branch) task_id 这也是一个 python 分支,但在 resolving_branch_task 结束执行后,它只是跳过了所有内容。 我不确定它有什么问题。 值得注意的是,当我 return 正常 task_id,而不是分支时,它会成功执行任务。 谁能帮忙,我将不胜感激。

BranchPythonOperator 任务将跳过其 python_callable 未返回的整个“分支”中的所有任务。这意味着当“check-resolving-branch”没有选择“export-final-annotation-task”时,它将被跳过,它的下游任务包括“check-annotation-branch”任务和所有 DAG 中的其他任务。

为此,您可以在 Airflow 中使用 Trigger Rules。默认情况下,所有任务的触发规则都是“all_success”。在此用例中,您可以将“check-annotation-branch”任务的触发规则设置为“all_done”,这将允许此任务在所有上游任务完成(即成功、失败、跳过)后执行。

这里有一个示例,可以让您了解在 DAG 中实现什么:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="branch_test",
    start_date=datetime(2021, 9, 10),
    schedule_interval=None,
) as dag:

    @task
    def func1():
        ...

    @task
    def func2():
        ...

    @task
    def func3():
        ...

    @task
    def func4():
        ...

    branch_1 = BranchPythonOperator(task_id="branch_1", python_callable=lambda: "branch_2")
    branch_2 = BranchPythonOperator(
        task_id="branch_2", python_callable=lambda: "func3", trigger_rule=TriggerRule.ALL_DONE
    )

    func1() >> branch_1 >> func2() >> branch_2
    branch_1 >> branch_2 >> [func3(), func4()]