我的 Jinja 在 Airflow 中的输出是值,但我更想要字符串,我怎样才能得到它?

output of my Jinja in Airflow is value but I rather want string how can I get it?

代码

"{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}"

我的 where 子句中上面的输出是

我得到的输出:

where last_updated_utc > {}
where last_updated_utc > 2021-08-26 11:40:33

期望的输出

where last_updated_utc > '2021-08-26 11:40:33'

我想将它用作 String ,如何将 jinja 输出从我得到的值转换为 String ?

如果您只需要添加一个单引号,您只需添加它即可。

""" SELECT col FROM my_table WHERE date_col='{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' """ 

我不确定为什么要将日期作为字符串进行比较。我猜你的最终目标是将值转换为你的 SQL 风格已知的时间戳?我的意思是,这是您可以在 SQL 本身内处理的事情 - 例如:

""" SELECT col FROM my_table WHERE date_col= CAST ('{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' AS TIMESTAMP)"""

一个有效的 DAG 示例:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator


def func(task_instance, **context):
    task_instance.xcom_push(key='QueryTimeStamp', value='2021-08-26 11:40:33')


dag = DAG(
    dag_id='my_dag',
    schedule_interval=None,
    start_date=datetime(2021, 8, 28),
    catchup=False,
)

op1 = PythonOperator(
    task_id='Match_Updated_dates',
    python_callable=func,
    dag=dag,
)

op2 = PostgresOperator(
    task_id='sql',
    sql=""" SELECT col FROM my_table WHERE date_col='{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' """,
    postgres_conn_id="postgres_default",
    dag=dag,

)

op1 >> op2