如何使用 bigquery 运算符将查询参数传递给 sql 文件
how to pass query parameter to sql file using bigquery operator
我需要访问 BigqueryOperator 在 sql 文件中传递的参数,但出现错误 ERROR - queryParameters argument must have a type <class 'dict'> not <class 'list'>
我正在使用以下代码:
t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
'xxxx',
'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag
)
Sql :
select cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3
from (select cast(@threshold_date as date) as dt_2) a
我正在使用 Google composer 版本 composer-1.7.0-airflow-1.10.2
提前致谢。
深入研究源代码后,似乎 BigQueryHook
在 Airflow 1.10.3 中修复了一个错误。
您定义 query_params
的方式对于较新版本的 Airflow 是正确的,根据 BigQuery API 应该是 list :请参阅 https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python.
无论如何,您收到此错误是因为在 Airflow 1.10.2 中,query_params
被定义为 dict
,请参阅:
https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L678
query_param_list = [
...
(query_params, 'queryParameters', None, dict),
...
]
这会导致内部 _validate_value
函数抛出一个 TypeError
:
https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L1954
def _validate_value(key, value, expected_type):
""" function to check expected type and raise
error if type is not correct """
if not isinstance(value, expected_type):
raise TypeError("{} argument must have a type {} not {}".format(
key, expected_type, type(value)))
我没有在 Airflow 1.10.2(或任何单元测试...)中找到 query_params
的任何示例,但我认为这只是因为它不可用。
这些错误已由这些提交修复:
- https://github.com/apache/airflow/commit/0c797a830e3370bd6e39f5fcfc128a8fd776912e#diff-ee06f8fcbc476ea65446a30160c2a2b2R784 : 将
dict
更改为 list
- https://github.com/apache/airflow/pull/4876 : 更新文档
这些更改已嵌入 Airflow 1.10.3,但截至目前,Airflow 1.10.3 在 Composer 中不可用 (https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments):最新版本已于 2019 年 5 月 16 日发布并嵌入版本 1.10.2.
等待这个新版本,我看到 2 种方法可以解决您的问题:
- copy/paste 修复了
BigQueryOperator
和 BigQueryHook
的版本并将它们嵌入到您的源代码中以使用它们,或者扩展现有的 BigQueryHook
并覆盖错误的方法。我不确定您是否可以直接修补 BigQueryHook
(无法在 Composer 环境中访问这些文件)
- 自己模板化 SQL 查询(而不是使用
query_params
)
这绝对是 composer (Airflow 1.10.2) 的一个错误,我们通过从 github 中提取气流文件并修补 bigquery_hook.py 文件然后在 bigquery_operator.py(均已上传到 lib 文件夹),修复为:
bigquery_operator.py(第 21 行)
从 lib.bigquery_hook 导入 BigQueryHook
bigquery_hook.py
(第 678 行)(query_params、'queryParameters'、None、列表)、
(第 731 行)if 'useLegacySql' in configuration['query'] and configuration['query']['useLegacySql'] and \
然后在你的dag中,引用上传的BQ算子:"from lib.bigquery_operator import BigQueryOperator"
分享两种在 BigQuery 运算符中传递查询参数的方法 -
Jinja 模板 - 在下面的查询中,您会看到 '{{ (execution_date - macros.timedelta( hours=1)).strftime('%Y-%m-%d %H:00:00') }}' 是将在运行时解析的 jina 模板。
SELECT owner_display_name, 标题, view_count FROM bigquery-public-data.Whosebug.posts_questions
WHERE creation_date > CAST('{{ (execution_date - macros.timedelta(小时=1)).strftime('%Y-%m-%d %H:00:00') }}' 作为时间戳) ORDER BY view_count 描述限制 100
query_params - 对于 in 子句,类型为数组,数组类型类型应为大查询中列的类型。
query_params=[ { 'name': 'DATE_IN_CLAUSE', 'parameterType': { 'type': 'ARRAY','arrayType': { 'type' :'TIMESTAMP'} },'parameterValue': { 'arrayValues': [{ 'value': datetime.utcnow().strftime('%Y- %m-%d %H:00:00') }, { 'value': (datetime.utcnow() - timedelta(hours=1)).strftime('%Y-%m-%d %H:00:00') }] } },
{ 'name': 'COUNT', 'parameterType': { 'type': 'INTEGER' },'parameterValue': { 'value': 1 } } ]
SELECT owner_display_name, 标题, view_count FROM bigquery-public-data.Whosebug.posts_questions WHERE creation_date in UNNEST(@DATE_IN_CLAUSE ) 和 view_count > @COUNT ORDER BY view_count DESC LIMIT 100
注意 - 上层查询和参数可能不会给您结果,但它们会成功而不会出现任何错误。这些例子只是为了演示如何传递参数。
我需要访问 BigqueryOperator 在 sql 文件中传递的参数,但出现错误 ERROR - queryParameters argument must have a type <class 'dict'> not <class 'list'>
我正在使用以下代码:
t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
'xxxx',
'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag
)
Sql :
select cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3
from (select cast(@threshold_date as date) as dt_2) a
我正在使用 Google composer 版本 composer-1.7.0-airflow-1.10.2
提前致谢。
深入研究源代码后,似乎 BigQueryHook
在 Airflow 1.10.3 中修复了一个错误。
您定义 query_params
的方式对于较新版本的 Airflow 是正确的,根据 BigQuery API 应该是 list :请参阅 https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python.
无论如何,您收到此错误是因为在 Airflow 1.10.2 中,query_params
被定义为 dict
,请参阅:
https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L678
query_param_list = [
...
(query_params, 'queryParameters', None, dict),
...
]
这会导致内部 _validate_value
函数抛出一个 TypeError
:
https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L1954
def _validate_value(key, value, expected_type):
""" function to check expected type and raise
error if type is not correct """
if not isinstance(value, expected_type):
raise TypeError("{} argument must have a type {} not {}".format(
key, expected_type, type(value)))
我没有在 Airflow 1.10.2(或任何单元测试...)中找到 query_params
的任何示例,但我认为这只是因为它不可用。
这些错误已由这些提交修复:
- https://github.com/apache/airflow/commit/0c797a830e3370bd6e39f5fcfc128a8fd776912e#diff-ee06f8fcbc476ea65446a30160c2a2b2R784 : 将
dict
更改为list
- https://github.com/apache/airflow/pull/4876 : 更新文档
这些更改已嵌入 Airflow 1.10.3,但截至目前,Airflow 1.10.3 在 Composer 中不可用 (https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments):最新版本已于 2019 年 5 月 16 日发布并嵌入版本 1.10.2.
等待这个新版本,我看到 2 种方法可以解决您的问题:
- copy/paste 修复了
BigQueryOperator
和BigQueryHook
的版本并将它们嵌入到您的源代码中以使用它们,或者扩展现有的BigQueryHook
并覆盖错误的方法。我不确定您是否可以直接修补BigQueryHook
(无法在 Composer 环境中访问这些文件) - 自己模板化 SQL 查询(而不是使用
query_params
)
这绝对是 composer (Airflow 1.10.2) 的一个错误,我们通过从 github 中提取气流文件并修补 bigquery_hook.py 文件然后在 bigquery_operator.py(均已上传到 lib 文件夹),修复为:
bigquery_operator.py(第 21 行)
从 lib.bigquery_hook 导入 BigQueryHook
bigquery_hook.py
(第 678 行)(query_params、'queryParameters'、None、列表)、
(第 731 行)if 'useLegacySql' in configuration['query'] and configuration['query']['useLegacySql'] and \
然后在你的dag中,引用上传的BQ算子:"from lib.bigquery_operator import BigQueryOperator"
分享两种在 BigQuery 运算符中传递查询参数的方法 -
Jinja 模板 - 在下面的查询中,您会看到 '{{ (execution_date - macros.timedelta( hours=1)).strftime('%Y-%m-%d %H:00:00') }}' 是将在运行时解析的 jina 模板。
SELECT owner_display_name, 标题, view_count FROM
bigquery-public-data.Whosebug.posts_questions
WHERE creation_date > CAST('{{ (execution_date - macros.timedelta(小时=1)).strftime('%Y-%m-%d %H:00:00') }}' 作为时间戳) ORDER BY view_count 描述限制 100query_params - 对于 in 子句,类型为数组,数组类型类型应为大查询中列的类型。
query_params=[ { 'name': 'DATE_IN_CLAUSE', 'parameterType': { 'type': 'ARRAY','arrayType': { 'type' :'TIMESTAMP'} },'parameterValue': { 'arrayValues': [{ 'value': datetime.utcnow().strftime('%Y- %m-%d %H:00:00') }, { 'value': (datetime.utcnow() - timedelta(hours=1)).strftime('%Y-%m-%d %H:00:00') }] } }, { 'name': 'COUNT', 'parameterType': { 'type': 'INTEGER' },'parameterValue': { 'value': 1 } } ]
SELECT owner_display_name, 标题, view_count FROM bigquery-public-data.Whosebug.posts_questions WHERE creation_date in UNNEST(@DATE_IN_CLAUSE ) 和 view_count > @COUNT ORDER BY view_count DESC LIMIT 100
注意 - 上层查询和参数可能不会给您结果,但它们会成功而不会出现任何错误。这些例子只是为了演示如何传递参数。