如何在 Google 大查询中创建外部 table Parquet 文件到 Airflow Dag 中的 运行

How to create external table in Google Big Query for Parquet file to run in Airflow Dag

我正在尝试在 Big Query 中为 GCS 存储桶上存在的 Parquet 文件创建外部 table。但是我 运行 气流中的以下代码出现错误:

错误:

ERROR - 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/project_dev/datasets/dataset_dev/tables?prettyPrint=false: When defining a table with an ExternalDataConfiguration, a schema must be present on either the Table or the ExternalDataConfiguration. If the schema is present on both, the schemas must be the same.
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/bigquery.py", line 1210, in execut
    encryption_configuration=self.encryption_configuration
  File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 675, in create_external_tabl
    table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=Tru
  File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 387, in create_empty_tabl
    table=table, exists_ok=exists_ok, retry=retr
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 622, in create_tabl
    timeout=timeout
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 640, in _call_ap
    return call(
  File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_fun
    on_error=on_error
  File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 184, in retry_targe
    return target(
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/_http.py", line 483, in api_reques

DAG 代码:

create_imp_external_table = BigQueryCreateExternalTableOperator(
        task_id=f"create_imp_external_table",
        bucket='temp_bucket',
        source_objects='data/part.parquet',
        destination_project_dataset_table=f"project_dev.dataset_dev.parquet_table",
        file_format='PARQUET',
        impersonation_chain='svc-acct@dev.iam.gserviceaccount.com',
        dag=dag
    )

理想情况下,它应该自动检测模式,但它不起作用。我也试过给 autoDetect=True 但这也没有用。

您遇到 400 错误是因为 impersonation_chainfile_format 不被 BigQueryCreateExternalTableOperator() 接受。我删除了 impersonation_chain,将 file_format 更改为 source_objects,您应该传递 source_objects 的列表而不是字符串。

我能够使用以下参数使其工作:

create_imp_external_table = BigQueryCreateExternalTableOperator(
    task_id=f"create_imp_external_table",
    bucket='my-bucket',
    source_objects=["/data/userdata1.parquet"], #pass a list
    destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
    source_format='PARQUET', #use source_format instead of file_format
)

为了测试,我使用了这个sample parquet。我正在使用 Composer 版本 1.17.2Airflow 版本 1.10.15。查看使用的完整 DAG:

import datetime
import airflow
from airflow.operators import bash_operator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'parquet_load_to_bq',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    create_imp_external_table = BigQueryCreateExternalTableOperator(
        task_id=f"create_imp_external_table",
        bucket='my-bucket',
        source_objects=["/data/userdata1.parquet"], #pass a list
        destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
        source_format='PARQUET', #use source_format instead of file_format
    )

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

create_imp_external_table >> print_dag_run_conf

查看示例日志 运行:

BQ数据: