Python Airflow bigquery 400 configuration.query.createDisposition 无法为脚本设置

Python Airflow bigquery 400 configuration.query.createDisposition cannot be set for scripts

最近我的 BigQueryExecuteQueryOperator (from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator)

开始出现错误
execute_query_job = BigQueryExecuteQueryOperator(
                        task_id = "execute_query_job_{}".format(destination_table),
                        use_legacy_sql = False,
                        sql = sql_query,
                        destination_dataset_table = destination_table,
                        create_disposition = "CREATE_IF_NEEDED",
                        write_disposition = 'WRITE_TRUNCATE',
                        dag = dag
                    )

job_id_execute = execute_query_job.execute(context=context)

上面的代码块按照预期的方式工作,因此工作正常。但是当我将 sql_query 更改为新的 Error 400: configuration.query.createDisposition cannot be set for scripts.

SQL 适用于代码块的脚本,..

with data_table as(
    select pltfm_name, event_dt as event_date
    from `project_id.dataset.data_tabele`
    AND event_dt BETWEEN  DATE('start_date',"America/Los_Angeles") AND DATE('end_date',"America/Los_Angeles")
),
activity_data as (
    select DATE(timestamp, "America/Los_Angeles") as event_date,
    COUNT (distinct CASE WHEN eventid = 'mp' AS bp
    from `project_id.dataset.data_tabele`
    AND DATE(timestamp, "America/Los_Angeles") between DATE("start_date","America/Los_Angeles") AND DATE("end_date","America/Los_Angeles")
    group by 1
),
cal as (
    select event_date FROM UNNEST(GENERATE_DATE_ARRAY(DATE("start_date","America/Los_Angeles"), DATE("end_date","America/Los_Angeles"))) event_date
)
select a.event_date,
coalesce(c.bp, 0) as bp,
from cal a
left join activity_data c on a.event_date = c.event_date;

但是下面的 SQL 脚本不起作用,它会报错。

DECLARE
  temp string DEFAULT 'D';
  SET temp = 'M';
WITH
  BASE_DATA AS (
  SELECT
    CASE
      WHEN temp = 'M' THEN DATE_TRUNC(EventDate,MONTH)
      WHEN temp =  'Q'THEN DATE_TRUNC(EventDate,QUARTER) 
  END
    ed,
    SUM(CASE
            WHEN temp = 'M' THEN tl
            WHEN temp = 'Q' THEN tl
     END) AS tl_count
  FROM
    `project_id.dataset.data_table`
  WHERE
    CASE
      WHEN temp = 'M' THEN (DATE(EventDate) BETWEEN DATE_ADD(DATE_TRUNC(DATE(CURRENT_DATE()), MONTH), INTERVAL -2 MONTH) AND DATE_ADD(DATE_TRUNC(CURRENT_DATE(), MONTH), INTERVAL -1 DAY))
      WHEN temp = 'Q' THEN (DATE(EventDate) BETWEEN DATE_ADD(DATE_TRUNC(DATE(CURRENT_DATE()), QUARTER), INTERVAL -2 QUARTER)
      AND DATE_ADD(DATE_TRUNC(CURRENT_DATE(), QUARTER), INTERVAL -1 DAY))
  END
  GROUP BY
    1
  ORDER BY
    1 DESC)
SELECT
  ed,
  tl_count
FROM
  BASE_DATA
ORDER BY
  ed DESC;

所以上面的 SQL 脚本抛出错误,但在 GCP BigQuery 中运行完美。我环顾四周,似乎 Airflow 无法使用 DECLARE 语句或类似语句执行查询。 (类似的问题 --> https://www.py4u.net/discuss/174607)。我已经尝试了他们的建议,但仍然没有用,最终出现了同样的错误。所以,现在我不确定是什么导致了这里的问题,以及是否有另一种方法可以在 Airflow.

中解决这个问题

有谁知道可能会发生什么以及解决方案或解决方法?

如您推测的那样,DECLARE 语句意味着此 SQL 文本中有多个离散步骤,因此它作为脚本而不是单个语句执行:https://cloud.google.com/bigquery/docs/reference/standard-sql/scripting

最简单的方法可能是删除与目标 table/dispositions 相关的作业配置属性,并将最终的 SELECT ... 更新为 CREATE OR REPLACE TABLE ... AS SELECT ...https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_table_statement