未找到 BigQueryCreateExternalTableOperator 的自动检测参数
Autodetect argument not found for BigQueryCreateExternalTableOperator
我对 Google Cloud Composer 上的 BigQueryCreateExternalTableOperator 自动检测参数有疑问。
我按以下方式使用运算符:
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
destination_project_dataset_table="<bq_dataset>.<bq_table>",
bucket="data_exchange_bucket",
autodetect=True,
source_objects=["<filename>__part*.csv"],
)
导入 dag 时,cloud composer 显示以下错误:
我检查了运算符 source code here 并确实在其中找到了“autodetect”参数。我尝试将参数定位在不同的位置,虽然这应该没有什么区别吧?
非常感谢您的支持。
该错误意味着您是 运行 旧版本的 google 提供商。
具体你是 运行 apache-airflow-providers-google<6.8.0
.
BigQueryCreateExternalTableOperator
的 autodetect
参数已在版本 6.8.0 中发布,因此您需要修改提供程序 apache-airflow-providers-google>=6.8.0
才能获得新功能。
如果升级不适合您,那么您可以通过向后移植 PR
中的代码来创建具有缺失功能的自定义运算符
我没有测试它,但可能是这样的:
class MyBigQueryCreateExternalTableOperator(BigQueryCreateExternalTableOperator):
def __init__(
self,
*,
autodetect: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.autodetect = autodetect
def execute(self, context: 'Context') -> None:
bq_hook = BigQueryHook(
gcp_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
if self.table_resource:
bq_hook.create_empty_table(
table_resource=self.table_resource,
)
return
if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GCSHook(
gcp_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
schema_fields = json.loads(gcs_hook.download(self.bucket, self.schema_object).decode("utf-8"))
else:
schema_fields = self.schema_fields
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
autodetect=self.autodetect,
compression=self.compression,
skip_leading_rows=self.skip_leading_rows,
field_delimiter=self.field_delimiter,
max_bad_records=self.max_bad_records,
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs,
labels=self.labels,
encryption_configuration=self.encryption_configuration,
)
然后您可以在 DAG 中使用 MyBigQueryCreateExternalTableOperator
。
我对 Google Cloud Composer 上的 BigQueryCreateExternalTableOperator 自动检测参数有疑问。
我按以下方式使用运算符:
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
destination_project_dataset_table="<bq_dataset>.<bq_table>",
bucket="data_exchange_bucket",
autodetect=True,
source_objects=["<filename>__part*.csv"],
)
导入 dag 时,cloud composer 显示以下错误:
我检查了运算符 source code here 并确实在其中找到了“autodetect”参数。我尝试将参数定位在不同的位置,虽然这应该没有什么区别吧?
非常感谢您的支持。
该错误意味着您是 运行 旧版本的 google 提供商。
具体你是 运行 apache-airflow-providers-google<6.8.0
.
BigQueryCreateExternalTableOperator
的 autodetect
参数已在版本 6.8.0 中发布,因此您需要修改提供程序 apache-airflow-providers-google>=6.8.0
才能获得新功能。
如果升级不适合您,那么您可以通过向后移植 PR
中的代码来创建具有缺失功能的自定义运算符我没有测试它,但可能是这样的:
class MyBigQueryCreateExternalTableOperator(BigQueryCreateExternalTableOperator):
def __init__(
self,
*,
autodetect: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.autodetect = autodetect
def execute(self, context: 'Context') -> None:
bq_hook = BigQueryHook(
gcp_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
if self.table_resource:
bq_hook.create_empty_table(
table_resource=self.table_resource,
)
return
if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GCSHook(
gcp_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
schema_fields = json.loads(gcs_hook.download(self.bucket, self.schema_object).decode("utf-8"))
else:
schema_fields = self.schema_fields
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
bq_hook.create_external_table(
external_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
autodetect=self.autodetect,
compression=self.compression,
skip_leading_rows=self.skip_leading_rows,
field_delimiter=self.field_delimiter,
max_bad_records=self.max_bad_records,
quote_character=self.quote_character,
allow_quoted_newlines=self.allow_quoted_newlines,
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs,
labels=self.labels,
encryption_configuration=self.encryption_configuration,
)
然后您可以在 DAG 中使用 MyBigQueryCreateExternalTableOperator
。