循环中的 Airflow DAG 任务依赖性
Airflow DAG Task Dependency in a Loop
我有一个 DAG,需要重新编译不同品牌的客户列表。该脚本使用两个参数 brand 和 listtype 调用。
我需要品牌同时 运行,但列表类型取决于前面的列表类型,但我不知道如何在循环中执行此操作。你能帮帮我吗?
BrandsToRun = ['A', 'B', 'C']
ListTypes = ['1', '2', '3']
# Defining the DAG
################################################################################
with DAG(
'MusterMaster',
default_args = default_args,
description = 'x',
# schedule_interval = None
schedule_interval = '30 4 * * *',
catchup = False
) as MusterMaster:
for Brand in BrandsToRun:
for ListType in ListTypes:
ListLoad = BashOperator(
task_id='Load_'+str(Brand)+'_'+str(ListType),
bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
pool='logs'
)
ListLoad
我希望任务具有这样的依赖结构,但我无法弄清楚。 Brand应该运行并发,但是ListTypes应该依赖前面的ListType。
集合 A 1 >> 集合 A 2 >> 集合 A 3
集合 B 1 >> 集合 B 2 >> 集合 B 3
集合 C 1 >> 集合 C 2 >> 集合 C 3
我怎样才能最好地做到这一点?
你可以这样做:
for Brand in BrandsToRun:
list = []
for ListType in ListTypes:
list.append(BashOperator(
task_id='Load_'+str(Brand)+'_'+str(ListType),
bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
pool='logs'))
if len(list) > 1:
list[-2] >> list[-1]
哪个会给你:
我有一个 DAG,需要重新编译不同品牌的客户列表。该脚本使用两个参数 brand 和 listtype 调用。
我需要品牌同时 运行,但列表类型取决于前面的列表类型,但我不知道如何在循环中执行此操作。你能帮帮我吗?
BrandsToRun = ['A', 'B', 'C']
ListTypes = ['1', '2', '3']
# Defining the DAG
################################################################################
with DAG(
'MusterMaster',
default_args = default_args,
description = 'x',
# schedule_interval = None
schedule_interval = '30 4 * * *',
catchup = False
) as MusterMaster:
for Brand in BrandsToRun:
for ListType in ListTypes:
ListLoad = BashOperator(
task_id='Load_'+str(Brand)+'_'+str(ListType),
bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
pool='logs'
)
ListLoad
我希望任务具有这样的依赖结构,但我无法弄清楚。 Brand应该运行并发,但是ListTypes应该依赖前面的ListType。
集合 A 1 >> 集合 A 2 >> 集合 A 3
集合 B 1 >> 集合 B 2 >> 集合 B 3
集合 C 1 >> 集合 C 2 >> 集合 C 3
我怎样才能最好地做到这一点?
你可以这样做:
for Brand in BrandsToRun:
list = []
for ListType in ListTypes:
list.append(BashOperator(
task_id='Load_'+str(Brand)+'_'+str(ListType),
bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
pool='logs'))
if len(list) > 1:
list[-2] >> list[-1]
哪个会给你: