AirFlow:如何在一行中设置大量外部依赖项?

AirFlow: How to set large number of external dependencies in one line?

我正在阅读 尝试实现对其他 DAG 任务的依赖。本例中依赖写为:

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \

在我的数据仓库中,一个table可能依赖于数百个任务。使用这种格式,它将生成 2*number of dependencies 行代码。这个实在不行table,还有更好的选择吗?

例如,在Azkaban中,我可以这样写多重依赖:

dependencies = dag1.task1, dag2.task4, dag2.task5, DAG3.task2, etc...

感谢任何帮助。

您可以在循环中创建传感器并在其中设置依赖项。我认为它更简洁,但我不确定随着依赖项数量的增加,这是否满足您对代码量的要求。

示例:

dependencies = [('dag1', 'task1'), ('dag2', 'task4'), ('dag2', 'task5'), ('dag3', 'task2')]

other_task = PythonOperator(...)

for dag_id, task_id in dependencies:
    sensor = ExternalTaskSensor(
        task_id='wait_for_{0}.{1}'.format(dag_id, task_id),
        external_dag_id=dag_id,
        external_task_id=task_id,
        dag=dag)
    sensor >> other_task