Airflow DAG 步骤依赖

Airflow DAG Steps Dependencies

我的 Airflow DAG 如下所示:

with DAG(
    dag_id='dag',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
    },
    dagrun_timeout=timedelta(hours=2),
    start_date=datetime(2021, 9, 28, 11),
    schedule_interval='10 * * * *'
) as dag:

    create_job_flow = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    job_step = EmrAddStepsOperator(
        task_id='job_step',
        job_flow_id=create_job_flow.output,
        aws_conn_id='aws_default',
        steps=JOB_SETP,
    )

    job_step_sensor = EmrStepSensor(
        task_id='job_step_sensor',
        job_flow_id=create_job_flow.output,
        step_id="{{ task_instance.xcom_pull(task_ids='job_step', key='return_value')[0] }}",
        aws_conn_id='aws_default',
    )

    read_file = PythonOperator(
        task_id="read_file",
        python_callable=get_date_information
    )

    alter_partitions = PythonOperator(
        task_id="alter_partitions",
        python_callable=update_partitions
    )

    remove_cluster = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id=create_job_flow.output,
        aws_conn_id='aws_default',
    )


    create_job_flow.set_downstream(job_step)
    job_step.set_downstream(job_step_sensor)
    job_step_sensor.set_downstream(read_file)
    read_file.set_downstream(alter_partitions)
    alter_partitions.set_downstream(remove_cluster)

所以这基本上是创建一个 EMR 集群,在其中启动一个步骤并感知该步骤。然后执行一些Python函数,最后终止集群。 Airflow UI中DAG的视图如下:

这里,create_job_flow也指向remove_cluster(可能是因为job_flow_id引用了create_job_flow)而我只设置了[=24的下游=] 到 remove_cluster。在达到 job_step 之前是否会发生这种情况,它会删除集群,因为在这种情况下,集群将在执行 split_job_step 之前被删除,因此这就是问题所在。 create_job_flow 和 remove_cluster 之间的 link 有什么办法去掉吗?或者它会等待完成 alter_partitions 然后执行 remove_cluster?

“remove_cluster”任务将等待“alter_partitions”任务完成。 “create_job_flow”和“remove_cluster”(以及“create_job_flow”和“job_step_sensor”之间的额外边缘是 TaskFlow API 和 XComArg 概念,即使用运算符的 .output 属性。 (查看 this documentation 另一个例子。)

在“remove_cluster”和“job_step_sensor”任务中,job_flow_id=create_job_flow.output 是一个输入参数。在幕后,当操作员的 .output 在模板化字段中用作另一个任务的输入时,会自动创建依赖关系。此功能确保使用其他任务 XComs 的任务之间以前隐式的任务依赖关系现在是明确的。

此管道将按照编写和期望顺序执行(假设 trigger_rule 是“all_success”,这是默认值)。在“create_job_flow”和“alter_partitions”任务都完成之前,“remove_cluster”任务不会执行(实际上是串行执行)。