任务组中的 Airflow 2 Xcom
Aiflow 2 Xcom in Task Groups
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码:
with TaskGroup('execute_my_steps') as execute_my_steps:
config = {some dictionary}
dependencies = {another dictionary}
task_id = 'execute_spark_job_step'
task_name = 'spark_job'
add_step = EmrAddStepsOperator(
task_id=task_id,
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
steps=create_emr_step(args=config, d=dependencies),
aws_conn_id='aws_default',
retries=3,
dag=dag
)
wait_for_step = EmrStepSensor(
task_id='wait_for_' + task_name + '_step',
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
retries=3,
dag=dag,
mode='reschedule'
)
add_step >> wait_for_step
问题是 step_id 没有正确呈现。 UI 渲染模板中的 wait_for_step
值显示为 'None'
,然而,execute_spark_job_step
的 xcom return_value 在那里(这是 emr step_id).
wait_for_step 呈现的模板:
execute_spark_job_step xcom:
当我删除 TaskGroup 时,它呈现正常并且该步骤会等待直到作业进入完成状态。
我需要将其加入任务组,因为我将遍历更大的配置文件并创建多个步骤。
为什么这不起作用?我需要一个嵌套的任务组吗?我尝试在没有上下文管理器的情况下使用 TaskGroup,但仍然没有成功。
TL;DR:
出现您的问题是因为 ID 不是 task_id
,而是 group_id.task_id
所以你的代码应该是:
task_ids=f"execute_my_steps.{ task_id }"
=>
step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids=f"execute_my_steps.{ task_id }", key='return_value') }}",
发生原因的解释:
当任务分配给 TaskGroup 时,任务的 ID 不再是 task_id,而是变成 group_id.task_id
以反映这种关系。
在 Airflow 中,task_id
是唯一的,但是当您使用 TaskGroup 时,您可以在不同的 TaskGroup 中设置相同的 task_id
。
如果此行为不是您想要的,您可以通过在任务组中设置 prefix_group_id=False
来禁用它:
with TaskGroup(
group_id='execute_my_steps',
prefix_group_id=False
) as execute_my_steps:
这样做,您的代码将无需更改即可工作。 task_id
只是没有 group_id
前缀的 task_id
。请注意,这也意味着您有责任确保您的 DAG 中没有重复的 task_id。
我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码:
with TaskGroup('execute_my_steps') as execute_my_steps:
config = {some dictionary}
dependencies = {another dictionary}
task_id = 'execute_spark_job_step'
task_name = 'spark_job'
add_step = EmrAddStepsOperator(
task_id=task_id,
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
steps=create_emr_step(args=config, d=dependencies),
aws_conn_id='aws_default',
retries=3,
dag=dag
)
wait_for_step = EmrStepSensor(
task_id='wait_for_' + task_name + '_step',
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
retries=3,
dag=dag,
mode='reschedule'
)
add_step >> wait_for_step
问题是 step_id 没有正确呈现。 UI 渲染模板中的 wait_for_step
值显示为 'None'
,然而,execute_spark_job_step
的 xcom return_value 在那里(这是 emr step_id).
wait_for_step 呈现的模板:
execute_spark_job_step xcom:
当我删除 TaskGroup 时,它呈现正常并且该步骤会等待直到作业进入完成状态。
我需要将其加入任务组,因为我将遍历更大的配置文件并创建多个步骤。
为什么这不起作用?我需要一个嵌套的任务组吗?我尝试在没有上下文管理器的情况下使用 TaskGroup,但仍然没有成功。
TL;DR:
出现您的问题是因为 ID 不是 task_id
,而是 group_id.task_id
所以你的代码应该是:
task_ids=f"execute_my_steps.{ task_id }"
=>
step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids=f"execute_my_steps.{ task_id }", key='return_value') }}",
发生原因的解释:
当任务分配给 TaskGroup 时,任务的 ID 不再是 task_id,而是变成 group_id.task_id
以反映这种关系。
在 Airflow 中,task_id
是唯一的,但是当您使用 TaskGroup 时,您可以在不同的 TaskGroup 中设置相同的 task_id
。
如果此行为不是您想要的,您可以通过在任务组中设置 prefix_group_id=False
来禁用它:
with TaskGroup(
group_id='execute_my_steps',
prefix_group_id=False
) as execute_my_steps:
这样做,您的代码将无需更改即可工作。 task_id
只是没有 group_id
前缀的 task_id
。请注意,这也意味着您有责任确保您的 DAG 中没有重复的 task_id。