Airflow Branch Operator 和任务组无效的任务 ID

Airflow Branch Operator and Task Group Invalid Task IDs

我有一个简单的 dag,它使用分支运算符来检查 y 是否为 False。如果是,则 dag 应该移至 say_goodbye 任务组。如果为真,它会跳过并转到 finish_dag_step。这是 dag:

def which_step() -> str:
  y = False
  if not y:
      return 'say_goodbye'
  else:
      return 'finish_dag_step'

with DAG(
  'my_test_dag',
  start_date = datetime(2022, 5, 14),
  schedule_interval = '0 0 * * *',
  catchup = True) as dag:

say_hello = BashOperator(
    task_id = 'say_hello',
    retries = 3,
    bash_command = 'echo "hello world"'
)

run_which_step = BranchPythonOperator(
    task_id = 'run_which_step',
    python_callable = which_step,
    retries = 3,
    retry_exponential_backoff = True,
    retry_delay = timedelta(seconds = 5)
)

with TaskGroup('say_goodbye') as say_goodbye:
    for i in range(0,2):
        step = BashOperator(
            task_id = 'step_' + str(i),
            retries = 3,
            bash_command = 'echo "goodbye world"'
            )

        step

finish_dag_step = BashOperator(
    task_id = 'finish_dag_step',
    retries = 3,
    bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step

当 dag 命中 run_which_step 时出现以下错误:

我不明白是什么原因造成的。怎么回事?

您不能为任务组创建任务依赖项。因此,您必须通过 task_id 来引用任务,这是任务组的名称和任务的 ID 由点 (task_group.task_id) 连接。

你的分支函数应该return类似于

def branch():
    if condition:
        return [f'task_group.task_{i}' for i in range(0,2)]
    return 'default'

但不是 return 以这种方式创建任务 ID 列表,可能最简单的方法是将 DummyOperator 作为您的 TaskGroup 入口点,并将这些任务置于您的 DummyOperator 下游。