如何使用 Python 设置导出 table 查询?

How to set export table query with Python?

我创建了一个脚本来导出来自 Firebase 集成配置的 BigQuery 数据。手动脚本运行良好,但是当我尝试在 python 中创建它时出现错误。

这是我根据我在手动创建的配置中看到的配置尝试的

from google.cloud import bigquery_datatransfer

transfer_client = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)

project_id = 'my_project_id'
dataset_id = 'analytics_000000000'
bucket_name = 'my_bucket'

sql = f"""
DECLARE tables ARRAY <STRING>;

CREATE TABLE IF NOT EXISTS `{project_id}.{dataset_id}.daily_export_log`
(
    table_name STRING,
    insert_date TIMESTAMP
);

SET tables = (SELECT
    ARRAY_AGG(TABLE_NAME) TABLES
FROM
    `{project_id}.{dataset_id}.INFORMATION_SCHEMA.TABLES`
WHERE
    REGEXP_CONTAINS(TABLE_NAME, 'events_\\d{{8}}') AND
    TABLE_NAME NOT IN (SELECT TABLE_NAME FROM `{project_id}.{dataset_id}.daily_export_log`)
);


FOR tab IN 
    (SELECT * FROM UNNEST(tables))
DO
    EXECUTE IMMEDIATE '''
    EXPORT DATA
    OPTIONS ( uri = CONCAT('gs://{bucket_name}/', ''' || "'" || tab.f0_ || "'" || ''', '/*_', format_timestamp('%Y%m%d%H%M%S', current_timestamp()), '.json.gz'),
        format='JSON',
        compression='GZIP',
        overwrite=FALSE ) AS
    SELECT * FROM `{project_id}.{dataset_id}.''' || tab.f0_ || '''` ''';

    EXECUTE IMMEDIATE '''
    INSERT INTO `{project_id}.{dataset_id}.daily_export_log` SELECT ''' || "'" || tab.f0_ || "'" || ''' table_name, current_timestamp() insert_date
    ''';
END FOR
"""

transfer_config = bigquery_datatransfer.TransferConfig(
    destination_dataset_id="",
    display_name="BigQuery to GCS Daily Backup",
    data_source_id="scheduled_query",
    params={
        "query": sql
    },
    schedule="every 1 hours",
)

transfer_config = transfer_client.create_transfer_config(
    bigquery_datatransfer.CreateTransferConfigRequest(
        parent=f"projects/{project_id}",
        transfer_config=transfer_config,
        service_account_name=gccreds['client_email'],
    )
)

print("Created scheduled query '{}'".format(transfer_config.name))

当我尝试 运行 时,出现以下错误。

InvalidArgument: 400 Cannot create a transfer with parent projects/{project_id} without location info when destination dataset is not specified.

所以我想这是因为行 destination_dataset_id="" 所以我用 destination_dataset_id=dataset_id 替换它来创建作业。但是,当我检查 BigQuery 中的状态时,它失败并出现错误:

Error code 9 : Dataset specified in the query ('') is not consistent with Destination dataset '{dataset_id}'.

我也试过包括 dataset_region=location 但这也没有用。

您遇到此错误是因为您的查询实际上不需要写入目标数据集,而是将值分配给 destination_dataset_id=dataset_id。您可以做的是删除参数 destination_dataset_id 并在 CreateTransferConfigRequest() 中添加 "location"parent。您的 parent 应如下所示:

  • projects/{project_id}/locations/{location}

在您的代码中编辑以下内容:

transfer_config = bigquery_datatransfer.TransferConfig(
    display_name="BigQuery to GCS Daily Backup",
    data_source_id="scheduled_query",
    params={
        "query": sql
    },
    schedule="every 1 hours",
)

transfer_config = transfer_client.create_transfer_config(
    bigquery_datatransfer.CreateTransferConfigRequest(
        parent=f"projects/{project_id}/locations/us", # Just used 'us' as location for example. Just use your actual location so you won't encounter errors.
        transfer_config=transfer_config,
        service_account_name=gccreds['client_email'],
    )
)

测试运行:

用于测试的查询:

project_id = 'my-project-id'
dataset_id = 'test_dataset'
bucket_name = 'my-bucket'

sql = f"""
EXPORT DATA OPTIONS(
  uri='gs://{bucket_name}/export/*.json.gz',
  format='JSON',
  compression='GZIP',
  overwrite=false
  ) AS
SELECT key,name FROM `{project_id}.{dataset_id}.source_t`
"""