我的 Jinja 在 Airflow 中的输出是值,但我更想要字符串,我怎样才能得到它?
output of my Jinja in Airflow is value but I rather want string how can I get it?
代码
"{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}"
我的 where 子句中上面的输出是
我得到的输出:
where last_updated_utc > {}
where last_updated_utc > 2021-08-26 11:40:33
期望的输出
where last_updated_utc > '2021-08-26 11:40:33'
我想将它用作 String ,如何将 jinja 输出从我得到的值转换为 String ?
如果您只需要添加一个单引号,您只需添加它即可。
""" SELECT col FROM my_table WHERE date_col='{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' """
我不确定为什么要将日期作为字符串进行比较。我猜你的最终目标是将值转换为你的 SQL 风格已知的时间戳?我的意思是,这是您可以在 SQL 本身内处理的事情 - 例如:
""" SELECT col FROM my_table WHERE date_col= CAST ('{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' AS TIMESTAMP)"""
一个有效的 DAG 示例:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
def func(task_instance, **context):
task_instance.xcom_push(key='QueryTimeStamp', value='2021-08-26 11:40:33')
dag = DAG(
dag_id='my_dag',
schedule_interval=None,
start_date=datetime(2021, 8, 28),
catchup=False,
)
op1 = PythonOperator(
task_id='Match_Updated_dates',
python_callable=func,
dag=dag,
)
op2 = PostgresOperator(
task_id='sql',
sql=""" SELECT col FROM my_table WHERE date_col='{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' """,
postgres_conn_id="postgres_default",
dag=dag,
)
op1 >> op2
代码
"{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}"
我的 where 子句中上面的输出是
我得到的输出:
where last_updated_utc > {}
where last_updated_utc > 2021-08-26 11:40:33
期望的输出
where last_updated_utc > '2021-08-26 11:40:33'
我想将它用作 String ,如何将 jinja 输出从我得到的值转换为 String ?
如果您只需要添加一个单引号,您只需添加它即可。
""" SELECT col FROM my_table WHERE date_col='{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' """
我不确定为什么要将日期作为字符串进行比较。我猜你的最终目标是将值转换为你的 SQL 风格已知的时间戳?我的意思是,这是您可以在 SQL 本身内处理的事情 - 例如:
""" SELECT col FROM my_table WHERE date_col= CAST ('{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' AS TIMESTAMP)"""
一个有效的 DAG 示例:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
def func(task_instance, **context):
task_instance.xcom_push(key='QueryTimeStamp', value='2021-08-26 11:40:33')
dag = DAG(
dag_id='my_dag',
schedule_interval=None,
start_date=datetime(2021, 8, 28),
catchup=False,
)
op1 = PythonOperator(
task_id='Match_Updated_dates',
python_callable=func,
dag=dag,
)
op2 = PostgresOperator(
task_id='sql',
sql=""" SELECT col FROM my_table WHERE date_col='{{ti.xcom_pull(task_ids='Match_Updated_dates', key='QueryTimeStamp')}}' """,
postgres_conn_id="postgres_default",
dag=dag,
)
op1 >> op2