Airflow:使用 TaskFlow API 动态生成任务

Airflow: Dynamically generate tasks with TaskFlow API

之前我使用以下代码片段动态生成任务:

dummy_start_task = PythonOperator(
    task_id="dummy_start",
    default_args=default_args,
    python_callable=dummy_start,
    dag=dag
)

make_images_tasks = list()
for n in range(WORKERS):
    globals()[f"make_images_{n}_task"] = PythonOperator(
        task_id=f'make_images_{n}',
        default_args=default_args,
        python_callable=make_images,
        op_kwargs={"n": n},
        dag=dag
    )
    make_images_tasks.append(globals()[f"make_images_{n}_task"])

dummy_collector_task = PythonOperator(
    task_id="dummy_collector",
    default_args=default_args,
    python_callable=dummy_collector,
    dag=dag
)

dummy_start_task >> make_images_tasks >> dummy_collector_task

# in collector_task I would use:
# items = task_instance.xcom_pull(task_ids=[f"make_images_{n}" for n in range(int(WORKERS))])
# to get the XCOMs from the these dynamically generated tasks

如何使用 TaskFlow API 实现该目标? (生成多个任务,然后在以下收集器任务中获取它们的 XComs)

这是一个例子:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

with DAG(dag_id="example_taskflow", start_date=datetime(2022, 1, 1), schedule_interval=None) as dag:

    @task
    def dummy_start_task():
        pass

    tasks = []
    for n in range(3):

        @task(task_id=f"make_images_{n}")
        def images_task(i):
            return i

        tasks.append(images_task(n))

    @task
    def dummy_collector_task(tasks):
        print(tasks)

    dummy_start_task_ = dummy_start_task()
    dummy_start_task_ >> tasks
    dummy_collector_task(tasks)

给出以下 DAG:

make_images_* 任务将 0、1 和 2 作为输入(并在任务的 ID 中使用它)和 return 值。 dummy_collector_taskmake_images_* 任务获取所有输出并打印 [0, 1, 2].