带有子任务的气流并行任务

Airflow parallel tasks with subtasks

我需要 运行 Apache Airflow 上的下图,但我对并行步骤有疑问,因为它们有多个子步骤

         ------> task_1a --> tast_1b ---               ------> task_4a --> tast_4b ---           
        /                                \            /                                \         
Start ---------> task_2a --> tast_2b --------> Step ---------> task_5a --> tast_5b --------> End 
        \                                /            \                                /         
         ------> task_3a --> tast_3b ---               ------> task_6a --> tast_6b ---           

我试图在一个单独的函数中生成 task_1a >> task_1b 块,但代码将大致转换为:

start >> 
    [tast_1a >> task_1b, tast_2a >> task_2b, tast_3a >> task_3b] >> \
    step >>
    [tast_4a >> task_4b, tast_5a >> task_5b, tast_6a >> task_6b] >> \
end

“步骤”是一个 DummyOperator,允许两个并行组按顺序执行。

结果是我得到了这个(为了简单起见,我只显示了每个并行组的第一个元素)

   task_1a                 task_4a
          \                       \  
Start ---> task_1b ---> Step ---> task_4b ---> End

因为当我执行 task_1a >> task_1b 时,此操作的 return 值为 task_1b 并开始直接连接到它,而 task_1a 是孤立的。如果我事先尝试 运行 此操作并将 task_1a 传递给列表,则与 task_1b.

同时触发“step”

我该如何解决这个问题?

SubDag 是一项已弃用的功能,但即便如此,它实际上也不允许并行,因为它仅限于 运行 顺序。您应该使用任务组。

这是您所追求的结构的示例代码:

from datetime import datetime

from airflow.decorators import task, task_group
from airflow.models.dag import DAG


@task
def task_start():
    """Dummy Task which is First Task of Dag"""
    return '[Task_start]'


@task
def task_1(value: int) -> str:
    """Dummy Task1"""
    return f'[ Task1 {value} ]'


@task
def task_2(value: str) -> str:
    """Dummy Task2"""
    return f'[ Task2 {value} ]'

@task
def task_3(value: str) -> str:
    """Dummy Task3"""
    return f'[ Task3 {value} ]'

@task
def task_4(value: str) -> str:
    """Dummy Task4"""
    return f'[ Task4 {value} ]'

@task
def task_end() -> None:
    """Dummy Task which is Last Task of Dag"""
    print('[ Task_End  ]')

@task
def task_step() -> None:
    """Dummy Task which is Step Task of Dag"""
    print('[ Task_Step  ]')

# Creating TaskGroups
@task_group
def task_group_function(value: int) -> None:
    """TaskGroup for grouping related Tasks"""
    task_2(task_1(value))


@task_group
def task_group_function2(value: int) -> None:
    """TaskGroup for grouping related Tasks"""
    task_3(task_4(value))


# Executing Tasks and TaskGroups
with DAG(
    dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False
) as dag:
    start_task = task_start()
    step_task = task_step()
    end_task = task_end()

    for i in range(5):
        first_task_group = task_group_function(i)
        second_task_group = task_group_function2(i)
        start_task >> first_task_group >> step_task >> second_task_group >> end_task

图表视图:

展开任务组时: