我们如何在气流中为多个环境创建三个任务

How can we create three tasks in airflow for multiple environments

我有三个任务,1.AddEMRStep 2.Sensor 3.SQLstep。我只想为两个环境创建它。

with dag:
    run_this_task = PythonOperator(
        task_id = "run_this",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2 = PythonOperator(
        task_id = "run_this2",
        python_callable=run_this_func,
        provide_context=True
    )

    run_this_task >> run_this_task2

现在我需要为多个环境创建这些 dags

我正在尝试做这样的事情

envs = ["stg","prod"]

如何使用 for 循环使其成为这样

with dag:
    run_this_task_stg = PythonOperator(
        task_id = "run_this_task_stg",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2_stg = PythonOperator(
        task_id = "run_this_task2_stg",
        python_callable=run_this_func,
        provide_context=True
    )

    run_this_task_prod = PythonOperator(
        task_id = "run_this_task_prod",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2_prod = PythonOperator(
        task_id = "run_this_task2_prod",
        python_callable=run_this_func,
        provide_context=True
    )

    start >> run_this_task_stg >> run_this_task2_stg 
    start >> run_this_task_prod >> run_this_task2_prod

当然!尝试这样的事情:

from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator


def push_to_xcom():
    ...

def run_this_func():
    ...

dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]

with dag:
    start = DummyOperator(task_id="start")
    
    for env in envs:
        run_this_task = PythonOperator(
            task_id = f"run_this_task_{env}",
            python_callable=push_to_xcom,
            retries=10,
            retry_delay=timedelta(seconds=1)
        )

        run_this_task2 = PythonOperator(
            task_id = f"run_this_task2_{env}",
            python_callable=run_this_func,
        )

        start >> run_this_task >> run_this_task2

图表视图

或者使用 TaskFlow API:

from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator


dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]

with dag:
    start = DummyOperator(task_id="start")

    for env in envs:

        @task(task_id=f"run_this_task_{env}", retries=10, retry_delay=timedelta(seconds=1))
        def push_to_xcom():
            ...

        @task(task_id=f"run_this_task2_{env}")
        def run_this_func():
            ...

        start >> push_to_xcom() >> run_this_func()