Airflow - 无法将 BashOperator 中的模板化 Env 变量推送到 XCom
Airflow - Unable to push templated Env variable in BashOperator to XCom
我正在努力熟悉 Cloud Composer 上关于 Xcom 和 Jinja 模板的 Airflow 2.2 基础知识。我的 BashOperator 代码如下(它使用文件名中的执行日期)。此文件名作为 Env
变量传递,以便 {{ ts }}
被识别为模板(如果我将其作为 Params
变量传递,执行日期将不会被解析为 Params
未根据 source code):
进行模板化
writeToFile1Bash = BashOperator (
task_id='writetofile1bash',
retries=0,
bash_command='echo "number,square,root\n" > ${file1name} && echo "{{ task_instance.xcom_push(key="file1name", value=$file1name) }}"',
env={
'file1name':'/home/airflow/gcs/data/file1-{{ ts }}.txt'
}
)
对变量 ${file1name}
的第一个引用按预期工作,如果我在那里完成 bash 命令,它将成功创建文件。但是,我希望能够将它作为 XCom(具有自定义变量名称)传递给下游任务,但我未能实现。在第二个命令中提取 file1name
变量时,我遇到了以下不同格式的错误:
-var.value.file1name
--> KeyError: 'Variable file1name does not exist'
-$(file1name)
或 $file1name
--> 渲染模板错误:101
处出现意外字符 '$'
-${file1name}
--> jinja2.exceptions.TemplateSyntaxError: 预期的标记 ':',得到 '}'
-env.file1name
、env['file1name']
或 ENV['file1name]
--> env 未定义
-params.file1name1
--> 渲染模板错误:StrictUndefined 类型的对象不是JSON可序列化
我已经尝试了所有我能想到的方法,但文档并不清楚。即使在这个 tutorial 中(就在短语:So, after modifying the DAG you should have this:
之后),由于 Jinja 解析变量的需要,我们鼓励您使用 env
而不是 params
,但是;没有代码示例说明如何从双括号 {{ }} 内引用此变量。
如果有人能帮助解决这个问题,我将不胜感激,因为我已经花了很多时间。
对于 BashOperator
,stdout 的最后一行将推送到 xcom。
你可以这样做:
writeToFile1Bash = BashOperator (
task_id='writetofile1bash',
retries=0,
bash_command='echo "number,square,root\n" > ${file1name} && echo $file1name',
env={
'file1name':'/home/airflow/gcs/data/file1-{{ ts }}.txt'
}
)
然后在后续运算符中:
puller = BashOperator(
task_id='puller',
bash_command='echo {{ ti.xcom_pull(task_ids="writetofile1bash")}}'
)
更新
没有 xcom 方法:
{{ ts }}
是 dag_run 的时间戳,因此 ts
不会在任务之间改变。
也就是说,在同一个 DAG 中多次使用 {{ ts }}
是安全的,并且使用静态变量就不必使用 xcom。
filename = '/home/airflow/gcs/data/file1-{{ ts }}.txt'
writeToFile1Bash = BashOperator (
task_id='writetofile1bash',
retries=0,
bash_command='echo "number,square,root\n" > ${file1name} && echo $file1name',
env={
'file1name': filename
}
)
puller = BashOperator(
task_id='puller',
bash_command=f'echo {filename}'
)
我正在努力熟悉 Cloud Composer 上关于 Xcom 和 Jinja 模板的 Airflow 2.2 基础知识。我的 BashOperator 代码如下(它使用文件名中的执行日期)。此文件名作为 Env
变量传递,以便 {{ ts }}
被识别为模板(如果我将其作为 Params
变量传递,执行日期将不会被解析为 Params
未根据 source code):
writeToFile1Bash = BashOperator (
task_id='writetofile1bash',
retries=0,
bash_command='echo "number,square,root\n" > ${file1name} && echo "{{ task_instance.xcom_push(key="file1name", value=$file1name) }}"',
env={
'file1name':'/home/airflow/gcs/data/file1-{{ ts }}.txt'
}
)
对变量 ${file1name}
的第一个引用按预期工作,如果我在那里完成 bash 命令,它将成功创建文件。但是,我希望能够将它作为 XCom(具有自定义变量名称)传递给下游任务,但我未能实现。在第二个命令中提取 file1name
变量时,我遇到了以下不同格式的错误:
-var.value.file1name
--> KeyError: 'Variable file1name does not exist'
-$(file1name)
或 $file1name
--> 渲染模板错误:101
-${file1name}
--> jinja2.exceptions.TemplateSyntaxError: 预期的标记 ':',得到 '}'
-env.file1name
、env['file1name']
或 ENV['file1name]
--> env 未定义
-params.file1name1
--> 渲染模板错误:StrictUndefined 类型的对象不是JSON可序列化
我已经尝试了所有我能想到的方法,但文档并不清楚。即使在这个 tutorial 中(就在短语:So, after modifying the DAG you should have this:
之后),由于 Jinja 解析变量的需要,我们鼓励您使用 env
而不是 params
,但是;没有代码示例说明如何从双括号 {{ }} 内引用此变量。
如果有人能帮助解决这个问题,我将不胜感激,因为我已经花了很多时间。
对于 BashOperator
,stdout 的最后一行将推送到 xcom。
你可以这样做:
writeToFile1Bash = BashOperator (
task_id='writetofile1bash',
retries=0,
bash_command='echo "number,square,root\n" > ${file1name} && echo $file1name',
env={
'file1name':'/home/airflow/gcs/data/file1-{{ ts }}.txt'
}
)
然后在后续运算符中:
puller = BashOperator(
task_id='puller',
bash_command='echo {{ ti.xcom_pull(task_ids="writetofile1bash")}}'
)
更新
没有 xcom 方法:
{{ ts }}
是 dag_run 的时间戳,因此 ts
不会在任务之间改变。
也就是说,在同一个 DAG 中多次使用 {{ ts }}
是安全的,并且使用静态变量就不必使用 xcom。
filename = '/home/airflow/gcs/data/file1-{{ ts }}.txt'
writeToFile1Bash = BashOperator (
task_id='writetofile1bash',
retries=0,
bash_command='echo "number,square,root\n" > ${file1name} && echo $file1name',
env={
'file1name': filename
}
)
puller = BashOperator(
task_id='puller',
bash_command=f'echo {filename}'
)