我们如何在气流中为多个环境创建三个任务
How can we create three tasks in airflow for multiple environments
我有三个任务,1.AddEMRStep 2.Sensor 3.SQLstep。我只想为两个环境创建它。
with dag:
run_this_task = PythonOperator(
task_id = "run_this",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2 = PythonOperator(
task_id = "run_this2",
python_callable=run_this_func,
provide_context=True
)
run_this_task >> run_this_task2
现在我需要为多个环境创建这些 dags
我正在尝试做这样的事情
envs = ["stg","prod"]
如何使用 for 循环使其成为这样
with dag:
run_this_task_stg = PythonOperator(
task_id = "run_this_task_stg",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2_stg = PythonOperator(
task_id = "run_this_task2_stg",
python_callable=run_this_func,
provide_context=True
)
run_this_task_prod = PythonOperator(
task_id = "run_this_task_prod",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2_prod = PythonOperator(
task_id = "run_this_task2_prod",
python_callable=run_this_func,
provide_context=True
)
start >> run_this_task_stg >> run_this_task2_stg
start >> run_this_task_prod >> run_this_task2_prod
当然!尝试这样的事情:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
def push_to_xcom():
...
def run_this_func():
...
dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]
with dag:
start = DummyOperator(task_id="start")
for env in envs:
run_this_task = PythonOperator(
task_id = f"run_this_task_{env}",
python_callable=push_to_xcom,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2 = PythonOperator(
task_id = f"run_this_task2_{env}",
python_callable=run_this_func,
)
start >> run_this_task >> run_this_task2
图表视图
或者使用 TaskFlow API:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator
dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]
with dag:
start = DummyOperator(task_id="start")
for env in envs:
@task(task_id=f"run_this_task_{env}", retries=10, retry_delay=timedelta(seconds=1))
def push_to_xcom():
...
@task(task_id=f"run_this_task2_{env}")
def run_this_func():
...
start >> push_to_xcom() >> run_this_func()
我有三个任务,1.AddEMRStep 2.Sensor 3.SQLstep。我只想为两个环境创建它。
with dag:
run_this_task = PythonOperator(
task_id = "run_this",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2 = PythonOperator(
task_id = "run_this2",
python_callable=run_this_func,
provide_context=True
)
run_this_task >> run_this_task2
现在我需要为多个环境创建这些 dags
我正在尝试做这样的事情
envs = ["stg","prod"]
如何使用 for 循环使其成为这样
with dag:
run_this_task_stg = PythonOperator(
task_id = "run_this_task_stg",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2_stg = PythonOperator(
task_id = "run_this_task2_stg",
python_callable=run_this_func,
provide_context=True
)
run_this_task_prod = PythonOperator(
task_id = "run_this_task_prod",
python_callable=push_to_xcom,
provide_context=True,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2_prod = PythonOperator(
task_id = "run_this_task2_prod",
python_callable=run_this_func,
provide_context=True
)
start >> run_this_task_stg >> run_this_task2_stg
start >> run_this_task_prod >> run_this_task2_prod
当然!尝试这样的事情:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
def push_to_xcom():
...
def run_this_func():
...
dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]
with dag:
start = DummyOperator(task_id="start")
for env in envs:
run_this_task = PythonOperator(
task_id = f"run_this_task_{env}",
python_callable=push_to_xcom,
retries=10,
retry_delay=timedelta(seconds=1)
)
run_this_task2 = PythonOperator(
task_id = f"run_this_task2_{env}",
python_callable=run_this_func,
)
start >> run_this_task >> run_this_task2
图表视图
或者使用 TaskFlow API:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator
dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]
with dag:
start = DummyOperator(task_id="start")
for env in envs:
@task(task_id=f"run_this_task_{env}", retries=10, retry_delay=timedelta(seconds=1))
def push_to_xcom():
...
@task(task_id=f"run_this_task2_{env}")
def run_this_func():
...
start >> push_to_xcom() >> run_this_func()