如何安排 DAG 并行处理 运行 一些任务,然后在它们完成后安排一个任务?

How to arrange a DAG to run some tasks in parallel and then one task when they have completed?

我有几个任务可以同时运行。当他们完成后,我需要 运行 最后一项任务。我试过像这样使用任务分组来做到这一点:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    with TaskGroup(group_id='task_group_1') as tg1:

    
        task1 = MyOperator(
            task_id='task1',
            dag=dag,
        )

        task2 = MyOperator(
            task_id='task2',
            dag=dag,
        )
        
        [task1, task2]    
    
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    tg1 >> final_task
   

但是这里发生的是 final_task 在任务组中的每个任务之后 运行 多次所以:

任务 1 -> final_task 任务 2 -> final_task

我想要的是任务组并行 运行,当它完成最后一个任务时 运行 一次,所以:

[任务 1、任务 2] -> final_task

我认为使用任务组可以帮助我完成这个要求,但它没有按预期工作。谁能帮忙?谢谢。

编辑:这是 Airflow 文档示例的结果。它导致 task3 在 group.task1 和 group1.task2 之后为 运行。在两个分组任务完成后,我只需要 运行 一次。

最后编辑: 事实证明我误解了树视图 - 图表视图确认了分组操作,尽管我仍然在最终任务中遇到一些其他错误。感谢您帮助我了解有关 DAG 的更多信息。

尝试从任务组中删除 [task1, task2],使其如下所示:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    with TaskGroup(group_id='task_group_1') as tg1:
        task1 = MyOperator(
            task_id='task1',
            dag=dag,
        )

        task2 = MyOperator(
            task_id='task2',
            dag=dag,
        )
        
    
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    tg1 >> final_task

我认为您不需要 return 任务组中的任何内容。只需将 TaskGroup 引用为依赖项即可。

这是来自 apache 的示例 airflow documentation:

with TaskGroup("group1") as group1:
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")

task3 = EmptyOperator(task_id="task3")

group1 >> task3

此外,您不需要使用任务组来实现此功能。你可以简单地这样做:

import airflow
from airflow.utils.task_group import TaskGroup

with airflow.DAG(
        'my_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:

    task1 = MyOperator(
        task_id='task1',
        dag=dag,
    )

    task2 = MyOperator(
        task_id='task2',
        dag=dag,
    )
        
    final_task = MyOtherOperator(
        task_id="final_task",
        dag=dag
    )

    task1 >> final_task
    task2 >> final_task