通过for循环在动态生成的任务之后执行单个任务
Execute single task AFTER dynamically-generated tasks via for-loop
假设我有以下 DAG(基本占位符函数),它使用 for 循环动态生成任务(通过遍历列表):
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'ETLUSER',
'depends_on_past': False,
'start_date': datetime(2019, 12, 16, 0, 0, 0),
'email': ['xxx@xxx.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('xxx', catchup=False,
default_args=default_args, schedule_interval='0 */4 * * *')
# Some dummy function
def StepOne(x):
print(x)
def StepTwo():
print("Okay, we finished all of Step 1.")
some_list = [1, 2, 3, 4, 5, 6]
for t in some_list:
task_id = f'FirstStep_{t}'
task = PythonOperator(
task_id=task_id,
python_callable=StepOne,
provide_context=False,
op_kwargs={'x': str(t)},
dag=dag
)
task
我想介绍一些简单的额外任务:
task2 = PythonOperator(
task_id="SecondStep",
python_callable=StepTwo,
provide_context=False,
dag=dag
)
仅在 所有 第一个步骤完成后运行。线性地,这将是 task >> task2
我该怎么做?
您可以将任务依赖于数组。
任务A和任务B都完成后执行任务C。
[taskA, taskB] >> taskC
或
任务A完成后并行执行任务B和任务C。
taskA >> [taskB, taskC]
只要upstream或downstream的1边是非数组即可
因此,对于您的示例,
task1 = []
for t in some_list:
task_id = f'FirstStep_{t}'
task1.append(PythonOperator(
task_id=task_id,
python_callable=StepOne,
provide_context=False,
op_kwargs={'x': str(t)},
dag=dag))
task2 = PythonOperator(
task_id="SecondStep",
python_callable=StepTwo,
provide_context=False,
dag=dag)
task1 >> task2
假设我有以下 DAG(基本占位符函数),它使用 for 循环动态生成任务(通过遍历列表):
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'ETLUSER',
'depends_on_past': False,
'start_date': datetime(2019, 12, 16, 0, 0, 0),
'email': ['xxx@xxx.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('xxx', catchup=False,
default_args=default_args, schedule_interval='0 */4 * * *')
# Some dummy function
def StepOne(x):
print(x)
def StepTwo():
print("Okay, we finished all of Step 1.")
some_list = [1, 2, 3, 4, 5, 6]
for t in some_list:
task_id = f'FirstStep_{t}'
task = PythonOperator(
task_id=task_id,
python_callable=StepOne,
provide_context=False,
op_kwargs={'x': str(t)},
dag=dag
)
task
我想介绍一些简单的额外任务:
task2 = PythonOperator(
task_id="SecondStep",
python_callable=StepTwo,
provide_context=False,
dag=dag
)
仅在 所有 第一个步骤完成后运行。线性地,这将是 task >> task2
我该怎么做?
您可以将任务依赖于数组。
任务A和任务B都完成后执行任务C。
[taskA, taskB] >> taskC
或
任务A完成后并行执行任务B和任务C。
taskA >> [taskB, taskC]
只要upstream或downstream的1边是非数组即可
因此,对于您的示例,
task1 = []
for t in some_list:
task_id = f'FirstStep_{t}'
task1.append(PythonOperator(
task_id=task_id,
python_callable=StepOne,
provide_context=False,
op_kwargs={'x': str(t)},
dag=dag))
task2 = PythonOperator(
task_id="SecondStep",
python_callable=StepTwo,
provide_context=False,
dag=dag)
task1 >> task2