Airflow:使用 TaskFlow API 动态生成任务
Airflow: Dynamically generate tasks with TaskFlow API
之前我使用以下代码片段动态生成任务:
dummy_start_task = PythonOperator(
task_id="dummy_start",
default_args=default_args,
python_callable=dummy_start,
dag=dag
)
make_images_tasks = list()
for n in range(WORKERS):
globals()[f"make_images_{n}_task"] = PythonOperator(
task_id=f'make_images_{n}',
default_args=default_args,
python_callable=make_images,
op_kwargs={"n": n},
dag=dag
)
make_images_tasks.append(globals()[f"make_images_{n}_task"])
dummy_collector_task = PythonOperator(
task_id="dummy_collector",
default_args=default_args,
python_callable=dummy_collector,
dag=dag
)
dummy_start_task >> make_images_tasks >> dummy_collector_task
# in collector_task I would use:
# items = task_instance.xcom_pull(task_ids=[f"make_images_{n}" for n in range(int(WORKERS))])
# to get the XCOMs from the these dynamically generated tasks
如何使用 TaskFlow API 实现该目标? (生成多个任务,然后在以下收集器任务中获取它们的 XComs)
这是一个例子:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="example_taskflow", start_date=datetime(2022, 1, 1), schedule_interval=None) as dag:
@task
def dummy_start_task():
pass
tasks = []
for n in range(3):
@task(task_id=f"make_images_{n}")
def images_task(i):
return i
tasks.append(images_task(n))
@task
def dummy_collector_task(tasks):
print(tasks)
dummy_start_task_ = dummy_start_task()
dummy_start_task_ >> tasks
dummy_collector_task(tasks)
给出以下 DAG:
make_images_*
任务将 0、1 和 2 作为输入(并在任务的 ID 中使用它)和 return 值。 dummy_collector_task
从 make_images_*
任务获取所有输出并打印 [0, 1, 2]
.
之前我使用以下代码片段动态生成任务:
dummy_start_task = PythonOperator(
task_id="dummy_start",
default_args=default_args,
python_callable=dummy_start,
dag=dag
)
make_images_tasks = list()
for n in range(WORKERS):
globals()[f"make_images_{n}_task"] = PythonOperator(
task_id=f'make_images_{n}',
default_args=default_args,
python_callable=make_images,
op_kwargs={"n": n},
dag=dag
)
make_images_tasks.append(globals()[f"make_images_{n}_task"])
dummy_collector_task = PythonOperator(
task_id="dummy_collector",
default_args=default_args,
python_callable=dummy_collector,
dag=dag
)
dummy_start_task >> make_images_tasks >> dummy_collector_task
# in collector_task I would use:
# items = task_instance.xcom_pull(task_ids=[f"make_images_{n}" for n in range(int(WORKERS))])
# to get the XCOMs from the these dynamically generated tasks
如何使用 TaskFlow API 实现该目标? (生成多个任务,然后在以下收集器任务中获取它们的 XComs)
这是一个例子:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="example_taskflow", start_date=datetime(2022, 1, 1), schedule_interval=None) as dag:
@task
def dummy_start_task():
pass
tasks = []
for n in range(3):
@task(task_id=f"make_images_{n}")
def images_task(i):
return i
tasks.append(images_task(n))
@task
def dummy_collector_task(tasks):
print(tasks)
dummy_start_task_ = dummy_start_task()
dummy_start_task_ >> tasks
dummy_collector_task(tasks)
给出以下 DAG:
make_images_*
任务将 0、1 和 2 作为输入(并在任务的 ID 中使用它)和 return 值。 dummy_collector_task
从 make_images_*
任务获取所有输出并打印 [0, 1, 2]
.