无法在带有 Airflow 的 Jinja 模板中使用 python 变量
Can't use python variable in jinja template with Airflow
我正在尝试使用 Airflow 在 AWS EMR 上执行 运行 11 步,并遵循此 code 作为参考。因为对 11 个步骤使用 EmrAddStepsOperator 和 EmrStepSensor 会重复太多。所以我试图遍历它。我在我的 DAG 中使用了以下代码。
step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]
# @evalcontextfilter
# def dangerous_render(context, value):
# return Markup(Template(value).render(context)).render()
for i in range(0,len(steps)):
#Add step
step_adder.append(EmrAddStepsOperator(
task_id=steps[i],
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=eval('step_'+str(i+1)),
))
print(step_adder)
#Step Sensor for checking
step_checker.append(EmrStepSensor(
task_id=steps[i]+'_check',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
#step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
aws_conn_id='aws_default',
))
我在这里遇到错误,EmrStepSensor 期望来自 EMR 的 step_id 在这里输入,并且是从 xcom 获取生成的(我想,我不是 100% 确定这段代码是如何工作的)。但是我的步骤存储在步骤列表中,所以我不能在 step_id 中的 task_id 中给出静态值,就像在参考代码中给出的那样,我无法弄清楚如何使用 jinja 模板使用 python 变量值将步骤列表中的值放在这里。
我使用了以下两种方式,以便step_id可以根据steps[i]
中的步骤名称从EMR中获取正确的步骤
step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")
然而,这两个都因 Airflow 中的语法错误而失败。因此,如果有人能指出我正确的方向来做到这一点,我将非常感激。我正在使用 Airflow 1.10.12(这是 AWS 上 Managed Apache Airflow 中 Airflow 的默认版本)。
我不确定这是否已经解决,所以:
使用 f 字符串:
f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"
使用.format
:
"{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}".format(steps[i])
请注意,您必须确保键 task_ids 的值用单引号引起来。此外,xcom_pull 中的 return 是一个列表,因此 o
末尾的索引 [0]
我正在尝试使用 Airflow 在 AWS EMR 上执行 运行 11 步,并遵循此 code 作为参考。因为对 11 个步骤使用 EmrAddStepsOperator 和 EmrStepSensor 会重复太多。所以我试图遍历它。我在我的 DAG 中使用了以下代码。
step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]
# @evalcontextfilter
# def dangerous_render(context, value):
# return Markup(Template(value).render(context)).render()
for i in range(0,len(steps)):
#Add step
step_adder.append(EmrAddStepsOperator(
task_id=steps[i],
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=eval('step_'+str(i+1)),
))
print(step_adder)
#Step Sensor for checking
step_checker.append(EmrStepSensor(
task_id=steps[i]+'_check',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
#step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
aws_conn_id='aws_default',
))
我在这里遇到错误,EmrStepSensor 期望来自 EMR 的 step_id 在这里输入,并且是从 xcom 获取生成的(我想,我不是 100% 确定这段代码是如何工作的)。但是我的步骤存储在步骤列表中,所以我不能在 step_id 中的 task_id 中给出静态值,就像在参考代码中给出的那样,我无法弄清楚如何使用 jinja 模板使用 python 变量值将步骤列表中的值放在这里。
我使用了以下两种方式,以便step_id可以根据steps[i]
中的步骤名称从EMR中获取正确的步骤step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")
然而,这两个都因 Airflow 中的语法错误而失败。因此,如果有人能指出我正确的方向来做到这一点,我将非常感激。我正在使用 Airflow 1.10.12(这是 AWS 上 Managed Apache Airflow 中 Airflow 的默认版本)。
我不确定这是否已经解决,所以:
使用 f 字符串:
f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"
使用.format
:
"{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}".format(steps[i])
请注意,您必须确保键 task_ids 的值用单引号引起来。此外,xcom_pull 中的 return 是一个列表,因此 o
末尾的索引 [0]