如何使用 bigquery 运算符将查询参数传递给 sql 文件

how to pass query parameter to sql file using bigquery operator

我需要访问 BigqueryOperator 在 sql 文件中传递的参数,但出现错误 ERROR - queryParameters argument must have a type <class 'dict'> not <class 'list'> 我正在使用以下代码:

t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
                                            'xxxx',
                                            'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag

)

Sql :

select  cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3 
from (select cast(@threshold_date as date) as dt_2) a

我正在使用 Google composer 版本 composer-1.7.0-airflow-1.10.2

提前致谢。

深入研究源代码后,似乎 BigQueryHook 在 Airflow 1.10.3 中修复了一个错误。

您定义 query_params 的方式对于较新版本的 Airflow 是正确的,根据 BigQuery API 应该是 list :请​​参阅 https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python.

无论如何,您收到此错误是因为在 Airflow 1.10.2 中,query_params 被定义为 dict,请参阅:

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L678

query_param_list = [
    ...
    (query_params, 'queryParameters', None, dict),
    ...
]

这会导致内部 _validate_value 函数抛出一个 TypeError :

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L1954

def _validate_value(key, value, expected_type):
    """ function to check expected type and raise
    error if type is not correct """
    if not isinstance(value, expected_type):
        raise TypeError("{} argument must have a type {} not {}".format(
            key, expected_type, type(value)))

我没有在 Airflow 1.10.2(或任何单元测试...)中找到 query_params 的任何示例,但我认为这只是因为它不可用。

这些错误已由这些提交修复:

这些更改已嵌入 Airflow 1.10.3,但截至目前,Airflow 1.10.3 在 Composer 中不可用 (https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments):最新版本已于 2019 年 5 月 16 日发布并嵌入版本 1.10.2.

等待这个新版本,我看到 2 种方法可以解决您的问题:

  • copy/paste 修复了 BigQueryOperatorBigQueryHook 的版本并将它们嵌入到您的源代码中以使用它们,或者扩展现有的 BigQueryHook 并覆盖错误的方法。我不确定您是否可以直接修补 BigQueryHook(无法在 Composer 环境中访问这些文件)
  • 自己模板化 SQL 查询(而不是使用 query_params

这绝对是 composer (Airflow 1.10.2) 的一个错误,我们通过从 github 中提取气流文件并修补 bigquery_hook.py 文件然后在 bigquery_operator.py(均已上传到 lib 文件夹),修复为:

  1. bigquery_operator.py(第 21 行)

    从 lib.bigquery_hook 导入 BigQueryHook

  2. bigquery_hook.py

    (第 678 行)(query_params、'queryParameters'、None、列表)、

    (第 731 行)if 'useLegacySql' in configuration['query'] and configuration['query']['useLegacySql'] and \

然后在你的dag中,引用上传的BQ算子:"from lib.bigquery_operator import BigQueryOperator"

分享两种在 BigQuery 运算符中传递查询参数的方法 -

  1. Jinja 模板 - 在下面的查询中,您会看到 '{{ (execution_date - macros.timedelta( hours=1)).strftime('%Y-%m-%d %H:00:00') }}' 是将在运行时解析的 jina 模板。

    SELECT owner_display_name, 标题, view_count FROM bigquery-public-data.Whosebug.posts_questions WHERE creation_date > CAST('{{ (execution_date - macros.timedelta(小时=1)).strftime('%Y-%m-%d %H:00:00') }}' 作为时间戳) ORDER BY view_count 描述限制 100

  2. query_params - 对于 in 子句,类型为数组,数组类型类型应为大查询中列​​的类型。

    query_params=[ { 'name': 'DATE_IN_CLAUSE', 'parameterType': { 'type': 'ARRAY','arrayType': { 'type' :'TIMESTAMP'} },'parameterValue': { 'arrayValues': [{ 'value': datetime.utcnow().strftime('%Y- %m-%d %H:00:00') }, { 'value': (datetime.utcnow() - timedelta(hours=1)).strftime('%Y-%m-%d %H:00:00') }] } }, { 'name': 'COUNT', 'parameterType': { 'type': 'INTEGER' },'parameterValue': { 'value': 1 } } ]

    SELECT owner_display_name, 标题, view_count FROM bigquery-public-data.Whosebug.posts_questions WHERE creation_date in UNNEST(@DATE_IN_CLAUSE ) 和 view_count > @COUNT ORDER BY view_count DESC LIMIT 100

注意 - 上层查询和参数可能不会给您结果,但它们会成功而不会出现任何错误。这些例子只是为了演示如何传递参数。