气流 BigqueryOperator 不工作 xcom_pull

airflow BigqueryOperator is not working xcom_pull

Bq = bigquery_operator.BigQueryOperator(
    task_id="Bq",
    bigquery_conn_id='xxxxxxxxxxx',
    use_legacy_sql=False,
    sql='aaaaaaa.sql',
    params={"project_id": project_id,
            "dataset_table_name": bq_dataset_table_name,
            "target_date": '{{ti.xcom_pull(task_ids="date")["date"]}}'},
)

‖如果你这样做,xcom_pull将不会展开,并会传递给SQL。 有解决办法吗?

params 不是模板化字段。不需要通过 params 传递 xcoms。 aaaaaaa.sql.

中可以直接使用

我假设您在 aaaaaaa.sql 中引用了 {{ params.target_date }}。要解决您的问题,只需将其替换为 {{ ti.xcom_pull(task_ids="date")["date"] }}

示例:

Bq = bigquery_operator.BigQueryOperator(
    task_id="Bq",
    bigquery_conn_id='xxxxxxxxxxx',
    use_legacy_sql=False,
    sql='aaaaaaa.sql',
    params={"project_id": project_id,
            "dataset_table_name": bq_dataset_table_name,
)

并在 aaaaaaa.sql 中:

SELECT *
FROM {{ti.xcom_pull(task_ids="date")["date"]}}