Airflow Branch Operator 和任务组无效的任务 ID
Airflow Branch Operator and Task Group Invalid Task IDs
我有一个简单的 dag,它使用分支运算符来检查 y 是否为 False。如果是,则 dag 应该移至 say_goodbye 任务组。如果为真,它会跳过并转到 finish_dag_step。这是 dag:
def which_step() -> str:
y = False
if not y:
return 'say_goodbye'
else:
return 'finish_dag_step'
with DAG(
'my_test_dag',
start_date = datetime(2022, 5, 14),
schedule_interval = '0 0 * * *',
catchup = True) as dag:
say_hello = BashOperator(
task_id = 'say_hello',
retries = 3,
bash_command = 'echo "hello world"'
)
run_which_step = BranchPythonOperator(
task_id = 'run_which_step',
python_callable = which_step,
retries = 3,
retry_exponential_backoff = True,
retry_delay = timedelta(seconds = 5)
)
with TaskGroup('say_goodbye') as say_goodbye:
for i in range(0,2):
step = BashOperator(
task_id = 'step_' + str(i),
retries = 3,
bash_command = 'echo "goodbye world"'
)
step
finish_dag_step = BashOperator(
task_id = 'finish_dag_step',
retries = 3,
bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step
当 dag 命中 run_which_step 时出现以下错误:
我不明白是什么原因造成的。怎么回事?
您不能为任务组创建任务依赖项。因此,您必须通过 task_id
来引用任务,这是任务组的名称和任务的 ID 由点 (task_group.task_id
) 连接。
你的分支函数应该return类似于
def branch():
if condition:
return [f'task_group.task_{i}' for i in range(0,2)]
return 'default'
但不是 return 以这种方式创建任务 ID 列表,可能最简单的方法是将 DummyOperator 作为您的 TaskGroup 入口点,并将这些任务置于您的 DummyOperator 下游。
我有一个简单的 dag,它使用分支运算符来检查 y 是否为 False。如果是,则 dag 应该移至 say_goodbye 任务组。如果为真,它会跳过并转到 finish_dag_step。这是 dag:
def which_step() -> str:
y = False
if not y:
return 'say_goodbye'
else:
return 'finish_dag_step'
with DAG(
'my_test_dag',
start_date = datetime(2022, 5, 14),
schedule_interval = '0 0 * * *',
catchup = True) as dag:
say_hello = BashOperator(
task_id = 'say_hello',
retries = 3,
bash_command = 'echo "hello world"'
)
run_which_step = BranchPythonOperator(
task_id = 'run_which_step',
python_callable = which_step,
retries = 3,
retry_exponential_backoff = True,
retry_delay = timedelta(seconds = 5)
)
with TaskGroup('say_goodbye') as say_goodbye:
for i in range(0,2):
step = BashOperator(
task_id = 'step_' + str(i),
retries = 3,
bash_command = 'echo "goodbye world"'
)
step
finish_dag_step = BashOperator(
task_id = 'finish_dag_step',
retries = 3,
bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step
当 dag 命中 run_which_step 时出现以下错误:
我不明白是什么原因造成的。怎么回事?
您不能为任务组创建任务依赖项。因此,您必须通过 task_id
来引用任务,这是任务组的名称和任务的 ID 由点 (task_group.task_id
) 连接。
你的分支函数应该return类似于
def branch():
if condition:
return [f'task_group.task_{i}' for i in range(0,2)]
return 'default'
但不是 return 以这种方式创建任务 ID 列表,可能最简单的方法是将 DummyOperator 作为您的 TaskGroup 入口点,并将这些任务置于您的 DummyOperator 下游。