Airflow - 无法将 BashOperator 中的模板化 Env 变量推送到 XCom

Airflow - Unable to push templated Env variable in BashOperator to XCom

我正在努力熟悉 Cloud Composer 上关于 Xcom 和 Jinja 模板的 Airflow 2.2 基础知识。我的 BashOperator 代码如下(它使用文件名中的执行日期)。此文件名作为 Env 变量传递,以便 {{ ts }} 被识别为模板(如果我将其作为 Params 变量传递,执行日期将不会被解析为 Params 未根据 source code):

进行模板化
    writeToFile1Bash = BashOperator (
        task_id='writetofile1bash',
        retries=0,
        bash_command='echo "number,square,root\n" > ${file1name} && echo "{{ task_instance.xcom_push(key="file1name", value=$file1name) }}"',
        env={
            'file1name':'/home/airflow/gcs/data/file1-{{ ts }}.txt'
        }
    )

对变量 ${file1name} 的第一个引用按预期工作,如果我在那里完成 bash 命令,它将成功创建文件。但是,我希望能够将它作为 XCom(具有自定义变量名称)传递给下游任务,但我未能实现。在第二个命令中提取 file1name 变量时,我遇到了以下不同格式的错误:

-var.value.file1name --> KeyError: 'Variable file1name does not exist'

-$(file1name)$file1name --> 渲染模板错误:101

处出现意外字符 '$'

-${file1name} --> jinja2.exceptions.TemplateSyntaxError: 预期的标记 ':',得到 '}'

-env.file1nameenv['file1name']ENV['file1name] --> env 未定义

-params.file1name1 --> 渲染模板错误:StrictUndefined 类型的对象不是JSON可序列化

我已经尝试了所有我能想到的方法,但文档并不清楚。即使在这个 tutorial 中(就在短语:So, after modifying the DAG you should have this: 之后),由于 Jinja 解析变量的需要,我们鼓励您使用 env 而不是 params,但是;没有代码示例说明如何从双括号 {{ }} 内引用此变量。

如果有人能帮助解决这个问题,我将不胜感激,因为我已经花了很多时间。

对于 BashOperator,stdout 的最后一行将推送到 xcom。

参考:https://airflow.apache.org/docs/apache-airflow/1.10.13/_api/airflow/operators/bash_operator/index.html#airflow.operators.bash_operator.BashOperator

你可以这样做:

writeToFile1Bash = BashOperator (
    task_id='writetofile1bash',
    retries=0,
    bash_command='echo "number,square,root\n" > ${file1name} && echo $file1name',
    env={
        'file1name':'/home/airflow/gcs/data/file1-{{ ts }}.txt'
    }
)

然后在后续运算符中:

puller = BashOperator(
    task_id='puller',
    bash_command='echo {{ ti.xcom_pull(task_ids="writetofile1bash")}}'
)

更新

没有 xcom 方法:

{{ ts }} 是 dag_run 的时间戳,因此 ts 不会在任务之间改变。

也就是说,在同一个 DAG 中多次使用 {{ ts }} 是安全的,并且使用静态变量就不必使用 xcom。

filename = '/home/airflow/gcs/data/file1-{{ ts }}.txt'

writeToFile1Bash = BashOperator (
    task_id='writetofile1bash',
    retries=0,
    bash_command='echo "number,square,root\n" > ${file1name} && echo $file1name',
    env={
        'file1name': filename
    }
)

puller = BashOperator(
    task_id='puller',
    bash_command=f'echo {filename}'
)