Airflow DAG 步骤依赖
Airflow DAG Steps Dependencies
我的 Airflow DAG 如下所示:
with DAG(
dag_id='dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
},
dagrun_timeout=timedelta(hours=2),
start_date=datetime(2021, 9, 28, 11),
schedule_interval='10 * * * *'
) as dag:
create_job_flow = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
job_step = EmrAddStepsOperator(
task_id='job_step',
job_flow_id=create_job_flow.output,
aws_conn_id='aws_default',
steps=JOB_SETP,
)
job_step_sensor = EmrStepSensor(
task_id='job_step_sensor',
job_flow_id=create_job_flow.output,
step_id="{{ task_instance.xcom_pull(task_ids='job_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
read_file = PythonOperator(
task_id="read_file",
python_callable=get_date_information
)
alter_partitions = PythonOperator(
task_id="alter_partitions",
python_callable=update_partitions
)
remove_cluster = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=create_job_flow.output,
aws_conn_id='aws_default',
)
create_job_flow.set_downstream(job_step)
job_step.set_downstream(job_step_sensor)
job_step_sensor.set_downstream(read_file)
read_file.set_downstream(alter_partitions)
alter_partitions.set_downstream(remove_cluster)
所以这基本上是创建一个 EMR 集群,在其中启动一个步骤并感知该步骤。然后执行一些Python函数,最后终止集群。 Airflow UI中DAG的视图如下:
这里,create_job_flow也指向remove_cluster(可能是因为job_flow_id引用了create_job_flow)而我只设置了[=24的下游=] 到 remove_cluster。在达到 job_step 之前是否会发生这种情况,它会删除集群,因为在这种情况下,集群将在执行 split_job_step 之前被删除,因此这就是问题所在。 create_job_flow 和 remove_cluster 之间的 link 有什么办法去掉吗?或者它会等待完成 alter_partitions 然后执行 remove_cluster?
“remove_cluster”任务将等待“alter_partitions”任务完成。 “create_job_flow”和“remove_cluster”(以及“create_job_flow”和“job_step_sensor”之间的额外边缘是 TaskFlow API 和 XComArg
概念,即使用运算符的 .output
属性。 (查看 this documentation 另一个例子。)
在“remove_cluster”和“job_step_sensor”任务中,job_flow_id=create_job_flow.output
是一个输入参数。在幕后,当操作员的 .output
在模板化字段中用作另一个任务的输入时,会自动创建依赖关系。此功能确保使用其他任务 XComs
的任务之间以前隐式的任务依赖关系现在是明确的。
此管道将按照编写和期望顺序执行(假设 trigger_rule
是“all_success”,这是默认值)。在“create_job_flow”和“alter_partitions”任务都完成之前,“remove_cluster”任务不会执行(实际上是串行执行)。
我的 Airflow DAG 如下所示:
with DAG(
dag_id='dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
},
dagrun_timeout=timedelta(hours=2),
start_date=datetime(2021, 9, 28, 11),
schedule_interval='10 * * * *'
) as dag:
create_job_flow = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
job_step = EmrAddStepsOperator(
task_id='job_step',
job_flow_id=create_job_flow.output,
aws_conn_id='aws_default',
steps=JOB_SETP,
)
job_step_sensor = EmrStepSensor(
task_id='job_step_sensor',
job_flow_id=create_job_flow.output,
step_id="{{ task_instance.xcom_pull(task_ids='job_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
read_file = PythonOperator(
task_id="read_file",
python_callable=get_date_information
)
alter_partitions = PythonOperator(
task_id="alter_partitions",
python_callable=update_partitions
)
remove_cluster = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=create_job_flow.output,
aws_conn_id='aws_default',
)
create_job_flow.set_downstream(job_step)
job_step.set_downstream(job_step_sensor)
job_step_sensor.set_downstream(read_file)
read_file.set_downstream(alter_partitions)
alter_partitions.set_downstream(remove_cluster)
所以这基本上是创建一个 EMR 集群,在其中启动一个步骤并感知该步骤。然后执行一些Python函数,最后终止集群。 Airflow UI中DAG的视图如下:
这里,create_job_flow也指向remove_cluster(可能是因为job_flow_id引用了create_job_flow)而我只设置了[=24的下游=] 到 remove_cluster。在达到 job_step 之前是否会发生这种情况,它会删除集群,因为在这种情况下,集群将在执行 split_job_step 之前被删除,因此这就是问题所在。 create_job_flow 和 remove_cluster 之间的 link 有什么办法去掉吗?或者它会等待完成 alter_partitions 然后执行 remove_cluster?
“remove_cluster”任务将等待“alter_partitions”任务完成。 “create_job_flow”和“remove_cluster”(以及“create_job_flow”和“job_step_sensor”之间的额外边缘是 TaskFlow API 和 XComArg
概念,即使用运算符的 .output
属性。 (查看 this documentation 另一个例子。)
在“remove_cluster”和“job_step_sensor”任务中,job_flow_id=create_job_flow.output
是一个输入参数。在幕后,当操作员的 .output
在模板化字段中用作另一个任务的输入时,会自动创建依赖关系。此功能确保使用其他任务 XComs
的任务之间以前隐式的任务依赖关系现在是明确的。
此管道将按照编写和期望顺序执行(假设 trigger_rule
是“all_success”,这是默认值)。在“create_job_flow”和“alter_partitions”任务都完成之前,“remove_cluster”任务不会执行(实际上是串行执行)。