如何在 Google 大查询中创建外部 table Parquet 文件到 Airflow Dag 中的 运行
How to create external table in Google Big Query for Parquet file to run in Airflow Dag
我正在尝试在 Big Query 中为 GCS 存储桶上存在的 Parquet 文件创建外部 table。但是我 运行 气流中的以下代码出现错误:
错误:
ERROR - 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/project_dev/datasets/dataset_dev/tables?prettyPrint=false: When defining a table with an ExternalDataConfiguration, a schema must be present on either the Table or the ExternalDataConfiguration. If the schema is present on both, the schemas must be the same.
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/bigquery.py", line 1210, in execut
encryption_configuration=self.encryption_configuration
File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 675, in create_external_tabl
table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=Tru
File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 387, in create_empty_tabl
table=table, exists_ok=exists_ok, retry=retr
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 622, in create_tabl
timeout=timeout
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 640, in _call_ap
return call(
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_fun
on_error=on_error
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 184, in retry_targe
return target(
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/_http.py", line 483, in api_reques
DAG 代码:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='temp_bucket',
source_objects='data/part.parquet',
destination_project_dataset_table=f"project_dev.dataset_dev.parquet_table",
file_format='PARQUET',
impersonation_chain='svc-acct@dev.iam.gserviceaccount.com',
dag=dag
)
理想情况下,它应该自动检测模式,但它不起作用。我也试过给 autoDetect=True 但这也没有用。
您遇到 400 错误是因为 impersonation_chain
和 file_format
不被 BigQueryCreateExternalTableOperator() 接受。我删除了 impersonation_chain
,将 file_format
更改为 source_objects
,您应该传递 source_objects
的列表而不是字符串。
我能够使用以下参数使其工作:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='my-bucket',
source_objects=["/data/userdata1.parquet"], #pass a list
destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
source_format='PARQUET', #use source_format instead of file_format
)
为了测试,我使用了这个sample parquet。我正在使用 Composer 版本 1.17.2 和 Airflow 版本 1.10.15。查看使用的完整 DAG:
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'parquet_load_to_bq',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='my-bucket',
source_objects=["/data/userdata1.parquet"], #pass a list
destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
source_format='PARQUET', #use source_format instead of file_format
)
# Print the dag_run id from the Airflow logs
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
create_imp_external_table >> print_dag_run_conf
查看示例日志 运行:
BQ数据:
我正在尝试在 Big Query 中为 GCS 存储桶上存在的 Parquet 文件创建外部 table。但是我 运行 气流中的以下代码出现错误:
错误:
ERROR - 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/project_dev/datasets/dataset_dev/tables?prettyPrint=false: When defining a table with an ExternalDataConfiguration, a schema must be present on either the Table or the ExternalDataConfiguration. If the schema is present on both, the schemas must be the same.
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/providers/google/cloud/operators/bigquery.py", line 1210, in execut
encryption_configuration=self.encryption_configuration
File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 675, in create_external_tabl
table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=Tru
File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 387, in create_empty_tabl
table=table, exists_ok=exists_ok, retry=retr
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 622, in create_tabl
timeout=timeout
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 640, in _call_ap
return call(
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_fun
on_error=on_error
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 184, in retry_targe
return target(
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/_http.py", line 483, in api_reques
DAG 代码:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='temp_bucket',
source_objects='data/part.parquet',
destination_project_dataset_table=f"project_dev.dataset_dev.parquet_table",
file_format='PARQUET',
impersonation_chain='svc-acct@dev.iam.gserviceaccount.com',
dag=dag
)
理想情况下,它应该自动检测模式,但它不起作用。我也试过给 autoDetect=True 但这也没有用。
您遇到 400 错误是因为 impersonation_chain
和 file_format
不被 BigQueryCreateExternalTableOperator() 接受。我删除了 impersonation_chain
,将 file_format
更改为 source_objects
,您应该传递 source_objects
的列表而不是字符串。
我能够使用以下参数使其工作:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='my-bucket',
source_objects=["/data/userdata1.parquet"], #pass a list
destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
source_format='PARQUET', #use source_format instead of file_format
)
为了测试,我使用了这个sample parquet。我正在使用 Composer 版本 1.17.2 和 Airflow 版本 1.10.15。查看使用的完整 DAG:
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'parquet_load_to_bq',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
create_imp_external_table = BigQueryCreateExternalTableOperator(
task_id=f"create_imp_external_table",
bucket='my-bucket',
source_objects=["/data/userdata1.parquet"], #pass a list
destination_project_dataset_table=f"my-project.my_dataset.parquet_table",
source_format='PARQUET', #use source_format instead of file_format
)
# Print the dag_run id from the Airflow logs
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
create_imp_external_table >> print_dag_run_conf
查看示例日志 运行:
BQ数据: