如何在 Airflow 中将值传递给 Xcom
How to pass value to Xcom in Airflow
last_updated_at = "{{ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}')}}".format(country,country)
这是我在 xcom 中保存的值
where last_updated_utc > '{}';
并在 where 子句中使用它
但 xcom 已正确传递到我的 where 子句中
where last_updated_utc > '{ti.xcom_pull(task_ids='Match_Updated_date_mm',
key='QueryTimeStamp_mm')}';
它传递了整个 String ,我该如何解决这个问题?
.format(last_updated_at)
这就是我在 where 子句中传递它的方式
当我不使用
my_xcom_value = "{{ti.xcom_pull(task_ids='Match_Updated_date',
key='QueryTimeStamp')}}"
当我传递参数时 Xcom 工作 fine.but,它不再工作了
Python 推送 xcom 的可调用函数
def match_dates(**Kwargs):
try:
print("enters the try block")
response = s3.get_object(Bucket='mygluecrawlerbucket',Key='DateTime/Users/my_date_{}.txt'.format(Kwargs['key1']))
print("responce is ", response)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print("Enters the status block ")
data = response['Body'].read().decode("utf-8")
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=someVariable)
拉xcom的PostGresOperator
import_redshift_table_zm = PostgresOperator(
task_id='copy_data_from_redshift_zm',
postgres_conn_id='postgres_default',
sql="""
BEGIN;
create table angaza_public_spark.stag_angaza_users_zm as
Select * FROM angaza_public_zm.users
where last_updated_utc > '{}';
END;
""".format("{{ti.xcom_pull(task_ids='Match_Updated_dates_zm', key='QueryTimeStamp_{}')}}".format(country))
两件事,
- 替换这个
{{ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}')}}
用这个(注意 {{
之后和 }}
之前的 space
{{ ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}') }}
- 取决于您在哪里使用
last_updated_at
参数。 airflow 中的每个运算符都有一个名为 template_fields
的 class 变量
如果您使用的是自定义运算符,请确保将 last_updated_at
添加为 template_fields
的一部分
例如:
template_fields = ('templates_dict', 'op_args', 'op_kwargs')
last_updated_at = "{{ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}')}}".format(country,country)
这是我在 xcom 中保存的值
where last_updated_utc > '{}';
并在 where 子句中使用它
但 xcom 已正确传递到我的 where 子句中
where last_updated_utc > '{ti.xcom_pull(task_ids='Match_Updated_date_mm',
key='QueryTimeStamp_mm')}';
它传递了整个 String ,我该如何解决这个问题?
.format(last_updated_at)
这就是我在 where 子句中传递它的方式
当我不使用
my_xcom_value = "{{ti.xcom_pull(task_ids='Match_Updated_date',
key='QueryTimeStamp')}}"
当我传递参数时 Xcom 工作 fine.but,它不再工作了
Python 推送 xcom 的可调用函数
def match_dates(**Kwargs):
try:
print("enters the try block")
response = s3.get_object(Bucket='mygluecrawlerbucket',Key='DateTime/Users/my_date_{}.txt'.format(Kwargs['key1']))
print("responce is ", response)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print("Enters the status block ")
data = response['Body'].read().decode("utf-8")
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=someVariable)
拉xcom的PostGresOperator
import_redshift_table_zm = PostgresOperator(
task_id='copy_data_from_redshift_zm',
postgres_conn_id='postgres_default',
sql="""
BEGIN;
create table angaza_public_spark.stag_angaza_users_zm as
Select * FROM angaza_public_zm.users
where last_updated_utc > '{}';
END;
""".format("{{ti.xcom_pull(task_ids='Match_Updated_dates_zm', key='QueryTimeStamp_{}')}}".format(country))
两件事,
- 替换这个
{{ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}')}}
用这个(注意 {{
之后和 }}
{{ ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}') }}
- 取决于您在哪里使用
last_updated_at
参数。 airflow 中的每个运算符都有一个名为template_fields
的 class 变量
如果您使用的是自定义运算符,请确保将 last_updated_at
添加为 template_fields
例如:
template_fields = ('templates_dict', 'op_args', 'op_kwargs')