带有子任务的气流并行任务
Airflow parallel tasks with subtasks
我需要 运行 Apache Airflow 上的下图,但我对并行步骤有疑问,因为它们有多个子步骤
------> task_1a --> tast_1b --- ------> task_4a --> tast_4b ---
/ \ / \
Start ---------> task_2a --> tast_2b --------> Step ---------> task_5a --> tast_5b --------> End
\ / \ /
------> task_3a --> tast_3b --- ------> task_6a --> tast_6b ---
我试图在一个单独的函数中生成 task_1a >> task_1b
块,但代码将大致转换为:
start >>
[tast_1a >> task_1b, tast_2a >> task_2b, tast_3a >> task_3b] >> \
step >>
[tast_4a >> task_4b, tast_5a >> task_5b, tast_6a >> task_6b] >> \
end
“步骤”是一个 DummyOperator,允许两个并行组按顺序执行。
结果是我得到了这个(为了简单起见,我只显示了每个并行组的第一个元素)
task_1a task_4a
\ \
Start ---> task_1b ---> Step ---> task_4b ---> End
因为当我执行 task_1a >> task_1b
时,此操作的 return 值为 task_1b 并开始直接连接到它,而 task_1a 是孤立的。如果我事先尝试 运行 此操作并将 task_1a 传递给列表,则与 task_1b.
同时触发“step”
我该如何解决这个问题?
SubDag 是一项已弃用的功能,但即便如此,它实际上也不允许并行,因为它仅限于 运行 顺序。您应该使用任务组。
这是您所追求的结构的示例代码:
from datetime import datetime
from airflow.decorators import task, task_group
from airflow.models.dag import DAG
@task
def task_start():
"""Dummy Task which is First Task of Dag"""
return '[Task_start]'
@task
def task_1(value: int) -> str:
"""Dummy Task1"""
return f'[ Task1 {value} ]'
@task
def task_2(value: str) -> str:
"""Dummy Task2"""
return f'[ Task2 {value} ]'
@task
def task_3(value: str) -> str:
"""Dummy Task3"""
return f'[ Task3 {value} ]'
@task
def task_4(value: str) -> str:
"""Dummy Task4"""
return f'[ Task4 {value} ]'
@task
def task_end() -> None:
"""Dummy Task which is Last Task of Dag"""
print('[ Task_End ]')
@task
def task_step() -> None:
"""Dummy Task which is Step Task of Dag"""
print('[ Task_Step ]')
# Creating TaskGroups
@task_group
def task_group_function(value: int) -> None:
"""TaskGroup for grouping related Tasks"""
task_2(task_1(value))
@task_group
def task_group_function2(value: int) -> None:
"""TaskGroup for grouping related Tasks"""
task_3(task_4(value))
# Executing Tasks and TaskGroups
with DAG(
dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False
) as dag:
start_task = task_start()
step_task = task_step()
end_task = task_end()
for i in range(5):
first_task_group = task_group_function(i)
second_task_group = task_group_function2(i)
start_task >> first_task_group >> step_task >> second_task_group >> end_task
图表视图:
展开任务组时:
我需要 运行 Apache Airflow 上的下图,但我对并行步骤有疑问,因为它们有多个子步骤
------> task_1a --> tast_1b --- ------> task_4a --> tast_4b ---
/ \ / \
Start ---------> task_2a --> tast_2b --------> Step ---------> task_5a --> tast_5b --------> End
\ / \ /
------> task_3a --> tast_3b --- ------> task_6a --> tast_6b ---
我试图在一个单独的函数中生成 task_1a >> task_1b
块,但代码将大致转换为:
start >>
[tast_1a >> task_1b, tast_2a >> task_2b, tast_3a >> task_3b] >> \
step >>
[tast_4a >> task_4b, tast_5a >> task_5b, tast_6a >> task_6b] >> \
end
“步骤”是一个 DummyOperator,允许两个并行组按顺序执行。
结果是我得到了这个(为了简单起见,我只显示了每个并行组的第一个元素)
task_1a task_4a
\ \
Start ---> task_1b ---> Step ---> task_4b ---> End
因为当我执行 task_1a >> task_1b
时,此操作的 return 值为 task_1b 并开始直接连接到它,而 task_1a 是孤立的。如果我事先尝试 运行 此操作并将 task_1a 传递给列表,则与 task_1b.
我该如何解决这个问题?
SubDag 是一项已弃用的功能,但即便如此,它实际上也不允许并行,因为它仅限于 运行 顺序。您应该使用任务组。
这是您所追求的结构的示例代码:
from datetime import datetime
from airflow.decorators import task, task_group
from airflow.models.dag import DAG
@task
def task_start():
"""Dummy Task which is First Task of Dag"""
return '[Task_start]'
@task
def task_1(value: int) -> str:
"""Dummy Task1"""
return f'[ Task1 {value} ]'
@task
def task_2(value: str) -> str:
"""Dummy Task2"""
return f'[ Task2 {value} ]'
@task
def task_3(value: str) -> str:
"""Dummy Task3"""
return f'[ Task3 {value} ]'
@task
def task_4(value: str) -> str:
"""Dummy Task4"""
return f'[ Task4 {value} ]'
@task
def task_end() -> None:
"""Dummy Task which is Last Task of Dag"""
print('[ Task_End ]')
@task
def task_step() -> None:
"""Dummy Task which is Step Task of Dag"""
print('[ Task_Step ]')
# Creating TaskGroups
@task_group
def task_group_function(value: int) -> None:
"""TaskGroup for grouping related Tasks"""
task_2(task_1(value))
@task_group
def task_group_function2(value: int) -> None:
"""TaskGroup for grouping related Tasks"""
task_3(task_4(value))
# Executing Tasks and TaskGroups
with DAG(
dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False
) as dag:
start_task = task_start()
step_task = task_step()
end_task = task_end()
for i in range(5):
first_task_group = task_group_function(i)
second_task_group = task_group_function2(i)
start_task >> first_task_group >> step_task >> second_task_group >> end_task
图表视图:
展开任务组时: