任务组中的 Airflow 2 Xcom

Aiflow 2 Xcom in Task Groups

我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码:

  with TaskGroup('execute_my_steps') as execute_my_steps: 

    config = {some dictionary}
    dependencies = {another dictionary}

    task_id = 'execute_spark_job_step'
    task_name = 'spark_job'

    add_step = EmrAddStepsOperator(
        task_id=task_id,
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        steps=create_emr_step(args=config, d=dependencies),
        aws_conn_id='aws_default',
        retries=3,
        dag=dag
    )

    wait_for_step = EmrStepSensor(
        task_id='wait_for_' + task_name + '_step',
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
        retries=3,
        dag=dag,
        mode='reschedule'
    )

    add_step >> wait_for_step

问题是 step_id 没有正确呈现。 UI 渲染模板中的 wait_for_step 值显示为 'None',然而,execute_spark_job_step 的 xcom return_value 在那里(这是 emr step_id).

wait_for_step 呈现的模板:

execute_spark_job_step xcom:

当我删除 TaskGroup 时,它呈现正常并且该步骤会等待直到作业进入完成状态。

我需要将其加入任务组,因为我将遍历更大的配置文件并创建多个步骤。

为什么这不起作用?我需要一个嵌套的任务组吗?我尝试在没有上下文管理器的情况下使用 TaskGroup,但仍然没有成功。

TL;DR:

出现您的问题是因为 ID 不是 task_id,而是 group_id.task_id 所以你的代码应该是:

task_ids=f"execute_my_steps.{ task_id }"

=>

    step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids=f"execute_my_steps.{ task_id }", key='return_value') }}",

发生原因的解释:

当任务分配给 TaskGroup 时,任务的 ID 不再是 task_id,而是变成 group_id.task_id 以反映这种关系。 在 Airflow 中,task_id 是唯一的,但是当您使用 TaskGroup 时,您可以在不同的 TaskGroup 中设置相同的 task_id。 如果此行为不是您想要的,您可以通过在任务组中设置 prefix_group_id=False 来禁用它:

with TaskGroup(
    group_id='execute_my_steps',
    prefix_group_id=False
) as execute_my_steps:

这样做,您的代码将无需更改即可工作。 task_id 只是没有 group_id 前缀的 task_id。请注意,这也意味着您有责任确保您的 DAG 中没有重复的 task_id。