BranchPythonOperator 到 TaskGroup 的问题
Problem with BranchPythonOperator to TaskGroup
我得到了以下DAG
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
def select_next_branch():
if some_condition:
next_task_ = 'tasks.inner'
else:
next_task_ = 'end'
logging.info(f'next_task: {next_task_}')
return next_task_
with DAG(dag_id='poc_branch_tasks',
description='Branching with task group POC',
schedule_interval=None,
start_date=days_ago(1),
tags=['poc', 'branch', 'task_group']) as dag:
start = DummyOperator(task_id='start')
branch = BranchPythonOperator(task_id='branch',
python_callable=select_next_branch)
with TaskGroup(group_id='tasks') as task_group:
inner_one = DummyOperator(task_id='inner')
end = DummyOperator(task_id='end')
start >> branch
branch >> end
branch >> task_group >> end
当 some_condition
满足时,流程正确地从 branch
到 task_group.inner
,否则应该从 branch
到 end
,但不是execute end
这个被跳过了。我做错了什么?
提前致谢。
尝试为 end
任务添加 trigger_rule='one_success'
。默认的 trigger_rule
是 all_success.
all_success (default): All upstream tasks have succeeded
但是,您的 end
任务依赖于分支操作员和 inner
任务。当inner
任务被跳过时,end
无法触发,因为其中一个上游任务没有“成功”。
如果分支运算符或 inner
任务成功,触发规则 one_success 将尝试执行此 end
任务。
end = DummyOperator(task_id='end', trigger_rule='one_success')
我得到了以下DAG
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
def select_next_branch():
if some_condition:
next_task_ = 'tasks.inner'
else:
next_task_ = 'end'
logging.info(f'next_task: {next_task_}')
return next_task_
with DAG(dag_id='poc_branch_tasks',
description='Branching with task group POC',
schedule_interval=None,
start_date=days_ago(1),
tags=['poc', 'branch', 'task_group']) as dag:
start = DummyOperator(task_id='start')
branch = BranchPythonOperator(task_id='branch',
python_callable=select_next_branch)
with TaskGroup(group_id='tasks') as task_group:
inner_one = DummyOperator(task_id='inner')
end = DummyOperator(task_id='end')
start >> branch
branch >> end
branch >> task_group >> end
当 some_condition
满足时,流程正确地从 branch
到 task_group.inner
,否则应该从 branch
到 end
,但不是execute end
这个被跳过了。我做错了什么?
提前致谢。
尝试为 end
任务添加 trigger_rule='one_success'
。默认的 trigger_rule
是 all_success.
all_success (default): All upstream tasks have succeeded
但是,您的 end
任务依赖于分支操作员和 inner
任务。当inner
任务被跳过时,end
无法触发,因为其中一个上游任务没有“成功”。
如果分支运算符或 inner
任务成功,触发规则 one_success 将尝试执行此 end
任务。
end = DummyOperator(task_id='end', trigger_rule='one_success')