如何安排 DAG 并行处理 运行 一些任务,然后在它们完成后安排一个任务?
How to arrange a DAG to run some tasks in parallel and then one task when they have completed?
我有几个任务可以同时运行。当他们完成后,我需要 运行 最后一项任务。我试过像这样使用任务分组来做到这一点:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
[task1, task2]
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
但是这里发生的是 final_task 在任务组中的每个任务之后 运行 多次所以:
任务 1 -> final_task
任务 2 -> final_task
我想要的是任务组并行 运行,当它完成最后一个任务时 运行 一次,所以:
[任务 1、任务 2] -> final_task
我认为使用任务组可以帮助我完成这个要求,但它没有按预期工作。谁能帮忙?谢谢。
编辑:这是 Airflow 文档示例的结果。它导致 task3 在 group.task1 和 group1.task2 之后为 运行。在两个分组任务完成后,我只需要 运行 一次。
最后编辑:
事实证明我误解了树视图 - 图表视图确认了分组操作,尽管我仍然在最终任务中遇到一些其他错误。感谢您帮助我了解有关 DAG 的更多信息。
尝试从任务组中删除 [task1, task2]
,使其如下所示:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
我认为您不需要 return 任务组中的任何内容。只需将 TaskGroup 引用为依赖项即可。
这是来自 apache 的示例 airflow documentation:
with TaskGroup("group1") as group1:
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
group1 >> task3
此外,您不需要使用任务组来实现此功能。你可以简单地这样做:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
task1 >> final_task
task2 >> final_task
我有几个任务可以同时运行。当他们完成后,我需要 运行 最后一项任务。我试过像这样使用任务分组来做到这一点:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
[task1, task2]
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
但是这里发生的是 final_task 在任务组中的每个任务之后 运行 多次所以:
任务 1 -> final_task 任务 2 -> final_task
我想要的是任务组并行 运行,当它完成最后一个任务时 运行 一次,所以:
[任务 1、任务 2] -> final_task
我认为使用任务组可以帮助我完成这个要求,但它没有按预期工作。谁能帮忙?谢谢。
编辑:这是 Airflow 文档示例的结果。它导致 task3 在 group.task1 和 group1.task2 之后为 运行。在两个分组任务完成后,我只需要 运行 一次。
最后编辑: 事实证明我误解了树视图 - 图表视图确认了分组操作,尽管我仍然在最终任务中遇到一些其他错误。感谢您帮助我了解有关 DAG 的更多信息。
尝试从任务组中删除 [task1, task2]
,使其如下所示:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
with TaskGroup(group_id='task_group_1') as tg1:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
tg1 >> final_task
我认为您不需要 return 任务组中的任何内容。只需将 TaskGroup 引用为依赖项即可。
这是来自 apache 的示例 airflow documentation:
with TaskGroup("group1") as group1:
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
group1 >> task3
此外,您不需要使用任务组来实现此功能。你可以简单地这样做:
import airflow
from airflow.utils.task_group import TaskGroup
with airflow.DAG(
'my_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
) as dag:
task1 = MyOperator(
task_id='task1',
dag=dag,
)
task2 = MyOperator(
task_id='task2',
dag=dag,
)
final_task = MyOtherOperator(
task_id="final_task",
dag=dag
)
task1 >> final_task
task2 >> final_task