未找到 BigQueryCreateExternalTableOperator 的自动检测参数

Autodetect argument not found for BigQueryCreateExternalTableOperator

我对 Google Cloud Composer 上的 BigQueryCreateExternalTableOperator 自动检测参数有疑问。

我按以下方式使用运算符:

create_external_table = BigQueryCreateExternalTableOperator(
        task_id="create_external_table",
        destination_project_dataset_table="<bq_dataset>.<bq_table>",
        bucket="data_exchange_bucket",
        autodetect=True,
        source_objects=["<filename>__part*.csv"],
    )

导入 dag 时,cloud composer 显示以下错误:

我检查了运算符 source code here 并确实在其中找到了“autodetect”参数。我尝试将参数定位在不同的位置,虽然这应该没有什么区别吧?

非常感谢您的支持。

该错误意味着您是 运行 旧版本的 google 提供商。 具体你是 运行 apache-airflow-providers-google<6.8.0.

BigQueryCreateExternalTableOperatorautodetect 参数已在版本 6.8.0 中发布,因此您需要修改提供程序 apache-airflow-providers-google>=6.8.0 才能获得新功能。

如果升级不适合您,那么您可以通过向后移植 PR

中的代码来创建具有缺失功能的自定义运算符

我没有测试它,但可能是这样的:

class MyBigQueryCreateExternalTableOperator(BigQueryCreateExternalTableOperator):
    def __init__(
        self,
        *,
        autodetect: bool = False,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)

        self.autodetect = autodetect

    def execute(self, context: 'Context') -> None:
        bq_hook = BigQueryHook(
            gcp_conn_id=self.bigquery_conn_id,
            delegate_to=self.delegate_to,
            location=self.location,
            impersonation_chain=self.impersonation_chain,
        )
        if self.table_resource:
            bq_hook.create_empty_table(
                table_resource=self.table_resource,
            )
            return
        if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
            gcs_hook = GCSHook(
                gcp_conn_id=self.google_cloud_storage_conn_id,
                delegate_to=self.delegate_to,
                impersonation_chain=self.impersonation_chain,
            )
            schema_fields = json.loads(gcs_hook.download(self.bucket, self.schema_object).decode("utf-8"))
        else:
            schema_fields = self.schema_fields
        source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
        bq_hook.create_external_table(
            external_project_dataset_table=self.destination_project_dataset_table,
            schema_fields=schema_fields,
            source_uris=source_uris,
            source_format=self.source_format,
            autodetect=self.autodetect,
            compression=self.compression,
            skip_leading_rows=self.skip_leading_rows,
            field_delimiter=self.field_delimiter,
            max_bad_records=self.max_bad_records,
            quote_character=self.quote_character,
            allow_quoted_newlines=self.allow_quoted_newlines,
            allow_jagged_rows=self.allow_jagged_rows,
            src_fmt_configs=self.src_fmt_configs,
            labels=self.labels,
            encryption_configuration=self.encryption_configuration,
        )

然后您可以在 DAG 中使用 MyBigQueryCreateExternalTableOperator