气流 BigqueryOperator 不工作 xcom_pull
airflow BigqueryOperator is not working xcom_pull
Bq = bigquery_operator.BigQueryOperator(
task_id="Bq",
bigquery_conn_id='xxxxxxxxxxx',
use_legacy_sql=False,
sql='aaaaaaa.sql',
params={"project_id": project_id,
"dataset_table_name": bq_dataset_table_name,
"target_date": '{{ti.xcom_pull(task_ids="date")["date"]}}'},
)
‖如果你这样做,xcom_pull将不会展开,并会传递给SQL。
有解决办法吗?
params
不是模板化字段。不需要通过 params
传递 xcoms。 aaaaaaa.sql
.
中可以直接使用
我假设您在 aaaaaaa.sql
中引用了 {{ params.target_date }}
。要解决您的问题,只需将其替换为 {{ ti.xcom_pull(task_ids="date")["date"] }}
示例:
Bq = bigquery_operator.BigQueryOperator(
task_id="Bq",
bigquery_conn_id='xxxxxxxxxxx',
use_legacy_sql=False,
sql='aaaaaaa.sql',
params={"project_id": project_id,
"dataset_table_name": bq_dataset_table_name,
)
并在 aaaaaaa.sql
中:
SELECT *
FROM {{ti.xcom_pull(task_ids="date")["date"]}}
Bq = bigquery_operator.BigQueryOperator(
task_id="Bq",
bigquery_conn_id='xxxxxxxxxxx',
use_legacy_sql=False,
sql='aaaaaaa.sql',
params={"project_id": project_id,
"dataset_table_name": bq_dataset_table_name,
"target_date": '{{ti.xcom_pull(task_ids="date")["date"]}}'},
)
‖如果你这样做,xcom_pull将不会展开,并会传递给SQL。 有解决办法吗?
params
不是模板化字段。不需要通过 params
传递 xcoms。 aaaaaaa.sql
.
我假设您在 aaaaaaa.sql
中引用了 {{ params.target_date }}
。要解决您的问题,只需将其替换为 {{ ti.xcom_pull(task_ids="date")["date"] }}
示例:
Bq = bigquery_operator.BigQueryOperator(
task_id="Bq",
bigquery_conn_id='xxxxxxxxxxx',
use_legacy_sql=False,
sql='aaaaaaa.sql',
params={"project_id": project_id,
"dataset_table_name": bq_dataset_table_name,
)
并在 aaaaaaa.sql
中:
SELECT *
FROM {{ti.xcom_pull(task_ids="date")["date"]}}