for循环AirFlow生成的任务之间的依赖关系

Dependencies between tasks generated by for loop AirFlow

当我通过for循环生成所有操作符时,如何创建任务依赖关系。

示例:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

tables = {
    'test_1': {
        'from_conn': 'c.from_conn',
        'to_conn': 'c.to_conn'
    },
    'test_2': {
        'from_conn': 'c.from_conn',
        'to_conn': 'c.to_conn'
    },
    'test_3': {
        'from_conn': 'c.from_conn',
        'to_conn': 'c.to_conn'
    },
}

default_args = {
    "depends_on_past": False,
    "start_date": datetime(2021, 4, 13),
    "max_active_runs": 1,
    }

dag = DAG(
    'etl',
    default_args=default_args,
    schedule_interval='30 3 * * 0',
    catchup=False,
    concurrency = 1,
    )

def run():
    print('Running ETL')


for table, args in tables.items():
    task_name = table

    task_name = PythonOperator( 
    task_id=table,
    dag=dag,
    python_callable=run,
    provide_context=False,
    ) 

如何声明任务 运行 序列如 test_1 >> test_2 >> test_3 而不会出错?

这是一个简单的示例,但您可以应用相同的想法(尽管它使用 TaskFlow API 而不是 PythonOperator):

from datetime import datetime

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator


@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def tasks_in_loop():
    tasks = []
    for i in range(3):
        @task(task_id=f"task_{i}")
        def run():
            ...

        t = run()
        tasks.append(t)

    chain(*tasks)

dag = tasks_in_loop()

作为参考,请查看 documentation on the chain() method