BranchPythonOperator 到 TaskGroup 的问题

Problem with BranchPythonOperator to TaskGroup

我得到了以下DAG

import logging

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup


def select_next_branch():
    if some_condition:
        next_task_ = 'tasks.inner'
    else:
        next_task_ = 'end'
    logging.info(f'next_task: {next_task_}')
    return next_task_


with DAG(dag_id='poc_branch_tasks',
         description='Branching with task group POC',
         schedule_interval=None,
         start_date=days_ago(1),
         tags=['poc', 'branch', 'task_group']) as dag:

    start = DummyOperator(task_id='start')

    branch = BranchPythonOperator(task_id='branch',
                                  python_callable=select_next_branch)

    with TaskGroup(group_id='tasks') as task_group:
        inner_one = DummyOperator(task_id='inner')

    end = DummyOperator(task_id='end')

    start >> branch
    branch >> end
    branch >> task_group >> end

some_condition 满足时,流程正确地从 branchtask_group.inner,否则应该从 branchend,但不是execute end 这个被跳过了。我做错了什么?

提前致谢。

尝试为 end 任务添加 trigger_rule='one_success'。默认的 trigger_rule 是 all_success.

all_success (default): All upstream tasks have succeeded

但是,您的 end 任务依赖于分支操作员和 inner 任务。当inner任务被跳过时,end无法触发,因为其中一个上游任务没有“成功”。

如果分支运算符或 inner 任务成功,触发规则 one_success 将尝试执行此 end 任务。

end = DummyOperator(task_id='end', trigger_rule='one_success')