循环中的 Airflow DAG 任务依赖性

Airflow DAG Task Dependency in a Loop

我有一个 DAG,需要重新编译不同品牌的客户列表。该脚本使用两个参数 brand 和 listtype 调用。

我需要品牌同时 运行,但列表类型取决于前面的列表类型,但我不知道如何在循环中执行此操作。你能帮帮我吗?

BrandsToRun = ['A', 'B', 'C']
ListTypes = ['1', '2', '3']

# Defining the DAG
################################################################################
with DAG(
        'MusterMaster',
        default_args = default_args,
        description = 'x',        
        # schedule_interval = None
        schedule_interval = '30 4 * * *',
        catchup = False
        ) as MusterMaster:

        for Brand in BrandsToRun:
            for ListType in ListTypes:

                ListLoad = BashOperator(
                                        task_id='Load_'+str(Brand)+'_'+str(ListType),
                                        bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
                                        pool='logs'
                                        )

ListLoad

我希望任务具有这样的依赖结构,但我无法弄清楚。 Brand应该运行并发,但是ListTypes应该依赖前面的ListType。

集合 A 1 >> 集合 A 2 >> 集合 A 3

集合 B 1 >> 集合 B 2 >> 集合 B 3

集合 C 1 >> 集合 C 2 >> 集合 C 3

我怎样才能最好地做到这一点?

你可以这样做:

    for Brand in BrandsToRun:
        list = []
        for ListType in ListTypes:
            list.append(BashOperator(
                task_id='Load_'+str(Brand)+'_'+str(ListType),
                bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
                pool='logs'))
            if len(list) > 1:
                list[-2] >> list[-1]

哪个会给你: