Python Airflow bigquery 400 configuration.query.createDisposition 无法为脚本设置
Python Airflow bigquery 400 configuration.query.createDisposition cannot be set for scripts
最近我的 BigQueryExecuteQueryOperator (from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
)
开始出现错误
execute_query_job = BigQueryExecuteQueryOperator(
task_id = "execute_query_job_{}".format(destination_table),
use_legacy_sql = False,
sql = sql_query,
destination_dataset_table = destination_table,
create_disposition = "CREATE_IF_NEEDED",
write_disposition = 'WRITE_TRUNCATE',
dag = dag
)
job_id_execute = execute_query_job.execute(context=context)
上面的代码块按照预期的方式工作,因此工作正常。但是当我将 sql_query
更改为新的 Error 400: configuration.query.createDisposition cannot be set for scripts
.
SQL
适用于代码块的脚本,..
with data_table as(
select pltfm_name, event_dt as event_date
from `project_id.dataset.data_tabele`
AND event_dt BETWEEN DATE('start_date',"America/Los_Angeles") AND DATE('end_date',"America/Los_Angeles")
),
activity_data as (
select DATE(timestamp, "America/Los_Angeles") as event_date,
COUNT (distinct CASE WHEN eventid = 'mp' AS bp
from `project_id.dataset.data_tabele`
AND DATE(timestamp, "America/Los_Angeles") between DATE("start_date","America/Los_Angeles") AND DATE("end_date","America/Los_Angeles")
group by 1
),
cal as (
select event_date FROM UNNEST(GENERATE_DATE_ARRAY(DATE("start_date","America/Los_Angeles"), DATE("end_date","America/Los_Angeles"))) event_date
)
select a.event_date,
coalesce(c.bp, 0) as bp,
from cal a
left join activity_data c on a.event_date = c.event_date;
但是下面的 SQL
脚本不起作用,它会报错。
DECLARE
temp string DEFAULT 'D';
SET temp = 'M';
WITH
BASE_DATA AS (
SELECT
CASE
WHEN temp = 'M' THEN DATE_TRUNC(EventDate,MONTH)
WHEN temp = 'Q'THEN DATE_TRUNC(EventDate,QUARTER)
END
ed,
SUM(CASE
WHEN temp = 'M' THEN tl
WHEN temp = 'Q' THEN tl
END) AS tl_count
FROM
`project_id.dataset.data_table`
WHERE
CASE
WHEN temp = 'M' THEN (DATE(EventDate) BETWEEN DATE_ADD(DATE_TRUNC(DATE(CURRENT_DATE()), MONTH), INTERVAL -2 MONTH) AND DATE_ADD(DATE_TRUNC(CURRENT_DATE(), MONTH), INTERVAL -1 DAY))
WHEN temp = 'Q' THEN (DATE(EventDate) BETWEEN DATE_ADD(DATE_TRUNC(DATE(CURRENT_DATE()), QUARTER), INTERVAL -2 QUARTER)
AND DATE_ADD(DATE_TRUNC(CURRENT_DATE(), QUARTER), INTERVAL -1 DAY))
END
GROUP BY
1
ORDER BY
1 DESC)
SELECT
ed,
tl_count
FROM
BASE_DATA
ORDER BY
ed DESC;
所以上面的 SQL
脚本抛出错误,但在 GCP BigQuery
中运行完美。我环顾四周,似乎 Airflow 无法使用 DECLARE
语句或类似语句执行查询。 (类似的问题 --> https://www.py4u.net/discuss/174607)。我已经尝试了他们的建议,但仍然没有用,最终出现了同样的错误。所以,现在我不确定是什么导致了这里的问题,以及是否有另一种方法可以在 Airflow
.
中解决这个问题
有谁知道可能会发生什么以及解决方案或解决方法?
如您推测的那样,DECLARE 语句意味着此 SQL 文本中有多个离散步骤,因此它作为脚本而不是单个语句执行:https://cloud.google.com/bigquery/docs/reference/standard-sql/scripting
最简单的方法可能是删除与目标 table/dispositions 相关的作业配置属性,并将最终的 SELECT ...
更新为 CREATE OR REPLACE TABLE ... AS SELECT ...
:https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_table_statement
最近我的 BigQueryExecuteQueryOperator (from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
)
execute_query_job = BigQueryExecuteQueryOperator(
task_id = "execute_query_job_{}".format(destination_table),
use_legacy_sql = False,
sql = sql_query,
destination_dataset_table = destination_table,
create_disposition = "CREATE_IF_NEEDED",
write_disposition = 'WRITE_TRUNCATE',
dag = dag
)
job_id_execute = execute_query_job.execute(context=context)
上面的代码块按照预期的方式工作,因此工作正常。但是当我将 sql_query
更改为新的 Error 400: configuration.query.createDisposition cannot be set for scripts
.
SQL
适用于代码块的脚本,..
with data_table as(
select pltfm_name, event_dt as event_date
from `project_id.dataset.data_tabele`
AND event_dt BETWEEN DATE('start_date',"America/Los_Angeles") AND DATE('end_date',"America/Los_Angeles")
),
activity_data as (
select DATE(timestamp, "America/Los_Angeles") as event_date,
COUNT (distinct CASE WHEN eventid = 'mp' AS bp
from `project_id.dataset.data_tabele`
AND DATE(timestamp, "America/Los_Angeles") between DATE("start_date","America/Los_Angeles") AND DATE("end_date","America/Los_Angeles")
group by 1
),
cal as (
select event_date FROM UNNEST(GENERATE_DATE_ARRAY(DATE("start_date","America/Los_Angeles"), DATE("end_date","America/Los_Angeles"))) event_date
)
select a.event_date,
coalesce(c.bp, 0) as bp,
from cal a
left join activity_data c on a.event_date = c.event_date;
但是下面的 SQL
脚本不起作用,它会报错。
DECLARE
temp string DEFAULT 'D';
SET temp = 'M';
WITH
BASE_DATA AS (
SELECT
CASE
WHEN temp = 'M' THEN DATE_TRUNC(EventDate,MONTH)
WHEN temp = 'Q'THEN DATE_TRUNC(EventDate,QUARTER)
END
ed,
SUM(CASE
WHEN temp = 'M' THEN tl
WHEN temp = 'Q' THEN tl
END) AS tl_count
FROM
`project_id.dataset.data_table`
WHERE
CASE
WHEN temp = 'M' THEN (DATE(EventDate) BETWEEN DATE_ADD(DATE_TRUNC(DATE(CURRENT_DATE()), MONTH), INTERVAL -2 MONTH) AND DATE_ADD(DATE_TRUNC(CURRENT_DATE(), MONTH), INTERVAL -1 DAY))
WHEN temp = 'Q' THEN (DATE(EventDate) BETWEEN DATE_ADD(DATE_TRUNC(DATE(CURRENT_DATE()), QUARTER), INTERVAL -2 QUARTER)
AND DATE_ADD(DATE_TRUNC(CURRENT_DATE(), QUARTER), INTERVAL -1 DAY))
END
GROUP BY
1
ORDER BY
1 DESC)
SELECT
ed,
tl_count
FROM
BASE_DATA
ORDER BY
ed DESC;
所以上面的 SQL
脚本抛出错误,但在 GCP BigQuery
中运行完美。我环顾四周,似乎 Airflow 无法使用 DECLARE
语句或类似语句执行查询。 (类似的问题 --> https://www.py4u.net/discuss/174607)。我已经尝试了他们的建议,但仍然没有用,最终出现了同样的错误。所以,现在我不确定是什么导致了这里的问题,以及是否有另一种方法可以在 Airflow
.
有谁知道可能会发生什么以及解决方案或解决方法?
如您推测的那样,DECLARE 语句意味着此 SQL 文本中有多个离散步骤,因此它作为脚本而不是单个语句执行:https://cloud.google.com/bigquery/docs/reference/standard-sql/scripting
最简单的方法可能是删除与目标 table/dispositions 相关的作业配置属性,并将最终的 SELECT ...
更新为 CREATE OR REPLACE TABLE ... AS SELECT ...
:https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_table_statement