如何在 Airflow 中将值传递给 Xcom

How to pass value to Xcom in Airflow

last_updated_at = "{{ti.xcom_pull(task_ids='Match_Updated_date_{}', 
 key='QueryTimeStamp_{}')}}".format(country,country) 

这是我在 xcom 中保存的值

where last_updated_utc > '{}';

并在 where 子句中使用它

但 xcom 已正确传递到我的 where 子句中

 where last_updated_utc > '{ti.xcom_pull(task_ids='Match_Updated_date_mm', 
 key='QueryTimeStamp_mm')}';

它传递了整个 String ,我该如何解决这个问题?

.format(last_updated_at)

这就是我在 where 子句中传递它的方式

当我不使用

 my_xcom_value = "{{ti.xcom_pull(task_ids='Match_Updated_date', 
 key='QueryTimeStamp')}}"

当我传递参数时 Xcom 工作 fine.but,它不再工作了

Python 推送 xcom 的可调用函数

def match_dates(**Kwargs):     
try:     
print("enters the try block")     
response = s3.get_object(Bucket='mygluecrawlerbucket',Key='DateTime/Users/my_date_{}.txt'.format(Kwargs['key1']))
print("responce is ", response)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print("Enters the status block ")
data = response['Body'].read().decode("utf-8")     
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=someVariable)

拉xcom的PostGresOperator

import_redshift_table_zm = PostgresOperator(
        task_id='copy_data_from_redshift_zm',
        postgres_conn_id='postgres_default',
        sql="""
                BEGIN;

            create table angaza_public_spark.stag_angaza_users_zm as
            Select * FROM angaza_public_zm.users
            where last_updated_utc > '{}';
              END;
                   
    """.format("{{ti.xcom_pull(task_ids='Match_Updated_dates_zm', key='QueryTimeStamp_{}')}}".format(country))

两件事,

  1. 替换这个
{{ti.xcom_pull(task_ids='Match_Updated_date_{}', 
 key='QueryTimeStamp_{}')}}

用这个(注意 {{ 之后和 }}

之前的 space
{{ ti.xcom_pull(task_ids='Match_Updated_date_{}', 
 key='QueryTimeStamp_{}') }}
  1. 取决于您在哪里使用 last_updated_at 参数。 airflow 中的每个运算符都有一个名为 template_fields
  2. 的 class 变量

如果您使用的是自定义运算符,请确保将 last_updated_at 添加为 template_fields

的一部分

例如:

template_fields = ('templates_dict', 'op_args', 'op_kwargs')