如何使用 Python 设置导出 table 查询?
How to set export table query with Python?
我创建了一个脚本来导出来自 Firebase 集成配置的 BigQuery 数据。手动脚本运行良好,但是当我尝试在 python 中创建它时出现错误。
这是我根据我在手动创建的配置中看到的配置尝试的
from google.cloud import bigquery_datatransfer
transfer_client = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
project_id = 'my_project_id'
dataset_id = 'analytics_000000000'
bucket_name = 'my_bucket'
sql = f"""
DECLARE tables ARRAY <STRING>;
CREATE TABLE IF NOT EXISTS `{project_id}.{dataset_id}.daily_export_log`
(
table_name STRING,
insert_date TIMESTAMP
);
SET tables = (SELECT
ARRAY_AGG(TABLE_NAME) TABLES
FROM
`{project_id}.{dataset_id}.INFORMATION_SCHEMA.TABLES`
WHERE
REGEXP_CONTAINS(TABLE_NAME, 'events_\\d{{8}}') AND
TABLE_NAME NOT IN (SELECT TABLE_NAME FROM `{project_id}.{dataset_id}.daily_export_log`)
);
FOR tab IN
(SELECT * FROM UNNEST(tables))
DO
EXECUTE IMMEDIATE '''
EXPORT DATA
OPTIONS ( uri = CONCAT('gs://{bucket_name}/', ''' || "'" || tab.f0_ || "'" || ''', '/*_', format_timestamp('%Y%m%d%H%M%S', current_timestamp()), '.json.gz'),
format='JSON',
compression='GZIP',
overwrite=FALSE ) AS
SELECT * FROM `{project_id}.{dataset_id}.''' || tab.f0_ || '''` ''';
EXECUTE IMMEDIATE '''
INSERT INTO `{project_id}.{dataset_id}.daily_export_log` SELECT ''' || "'" || tab.f0_ || "'" || ''' table_name, current_timestamp() insert_date
''';
END FOR
"""
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id="",
display_name="BigQuery to GCS Daily Backup",
data_source_id="scheduled_query",
params={
"query": sql
},
schedule="every 1 hours",
)
transfer_config = transfer_client.create_transfer_config(
bigquery_datatransfer.CreateTransferConfigRequest(
parent=f"projects/{project_id}",
transfer_config=transfer_config,
service_account_name=gccreds['client_email'],
)
)
print("Created scheduled query '{}'".format(transfer_config.name))
当我尝试 运行 时,出现以下错误。
InvalidArgument: 400 Cannot create a transfer with parent projects/{project_id} without location info when destination dataset is not specified.
所以我想这是因为行 destination_dataset_id=""
所以我用 destination_dataset_id=dataset_id
替换它来创建作业。但是,当我检查 BigQuery 中的状态时,它失败并出现错误:
Error code 9 : Dataset specified in the query ('') is not consistent with Destination dataset '{dataset_id}'.
我也试过包括 dataset_region=location
但这也没有用。
您遇到此错误是因为您的查询实际上不需要写入目标数据集,而是将值分配给 destination_dataset_id=dataset_id
。您可以做的是删除参数 destination_dataset_id
并在 CreateTransferConfigRequest()
中添加 "location" 到 parent
。您的 parent
应如下所示:
projects/{project_id}/locations/{location}
在您的代码中编辑以下内容:
transfer_config = bigquery_datatransfer.TransferConfig(
display_name="BigQuery to GCS Daily Backup",
data_source_id="scheduled_query",
params={
"query": sql
},
schedule="every 1 hours",
)
transfer_config = transfer_client.create_transfer_config(
bigquery_datatransfer.CreateTransferConfigRequest(
parent=f"projects/{project_id}/locations/us", # Just used 'us' as location for example. Just use your actual location so you won't encounter errors.
transfer_config=transfer_config,
service_account_name=gccreds['client_email'],
)
)
测试运行:
用于测试的查询:
project_id = 'my-project-id'
dataset_id = 'test_dataset'
bucket_name = 'my-bucket'
sql = f"""
EXPORT DATA OPTIONS(
uri='gs://{bucket_name}/export/*.json.gz',
format='JSON',
compression='GZIP',
overwrite=false
) AS
SELECT key,name FROM `{project_id}.{dataset_id}.source_t`
"""
我创建了一个脚本来导出来自 Firebase 集成配置的 BigQuery 数据。手动脚本运行良好,但是当我尝试在 python 中创建它时出现错误。
这是我根据我在手动创建的配置中看到的配置尝试的
from google.cloud import bigquery_datatransfer
transfer_client = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)
project_id = 'my_project_id'
dataset_id = 'analytics_000000000'
bucket_name = 'my_bucket'
sql = f"""
DECLARE tables ARRAY <STRING>;
CREATE TABLE IF NOT EXISTS `{project_id}.{dataset_id}.daily_export_log`
(
table_name STRING,
insert_date TIMESTAMP
);
SET tables = (SELECT
ARRAY_AGG(TABLE_NAME) TABLES
FROM
`{project_id}.{dataset_id}.INFORMATION_SCHEMA.TABLES`
WHERE
REGEXP_CONTAINS(TABLE_NAME, 'events_\\d{{8}}') AND
TABLE_NAME NOT IN (SELECT TABLE_NAME FROM `{project_id}.{dataset_id}.daily_export_log`)
);
FOR tab IN
(SELECT * FROM UNNEST(tables))
DO
EXECUTE IMMEDIATE '''
EXPORT DATA
OPTIONS ( uri = CONCAT('gs://{bucket_name}/', ''' || "'" || tab.f0_ || "'" || ''', '/*_', format_timestamp('%Y%m%d%H%M%S', current_timestamp()), '.json.gz'),
format='JSON',
compression='GZIP',
overwrite=FALSE ) AS
SELECT * FROM `{project_id}.{dataset_id}.''' || tab.f0_ || '''` ''';
EXECUTE IMMEDIATE '''
INSERT INTO `{project_id}.{dataset_id}.daily_export_log` SELECT ''' || "'" || tab.f0_ || "'" || ''' table_name, current_timestamp() insert_date
''';
END FOR
"""
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id="",
display_name="BigQuery to GCS Daily Backup",
data_source_id="scheduled_query",
params={
"query": sql
},
schedule="every 1 hours",
)
transfer_config = transfer_client.create_transfer_config(
bigquery_datatransfer.CreateTransferConfigRequest(
parent=f"projects/{project_id}",
transfer_config=transfer_config,
service_account_name=gccreds['client_email'],
)
)
print("Created scheduled query '{}'".format(transfer_config.name))
当我尝试 运行 时,出现以下错误。
InvalidArgument: 400 Cannot create a transfer with parent projects/{project_id} without location info when destination dataset is not specified.
所以我想这是因为行 destination_dataset_id=""
所以我用 destination_dataset_id=dataset_id
替换它来创建作业。但是,当我检查 BigQuery 中的状态时,它失败并出现错误:
Error code 9 : Dataset specified in the query ('') is not consistent with Destination dataset '{dataset_id}'.
我也试过包括 dataset_region=location
但这也没有用。
您遇到此错误是因为您的查询实际上不需要写入目标数据集,而是将值分配给 destination_dataset_id=dataset_id
。您可以做的是删除参数 destination_dataset_id
并在 CreateTransferConfigRequest()
中添加 "location" 到 parent
。您的 parent
应如下所示:
projects/{project_id}/locations/{location}
在您的代码中编辑以下内容:
transfer_config = bigquery_datatransfer.TransferConfig(
display_name="BigQuery to GCS Daily Backup",
data_source_id="scheduled_query",
params={
"query": sql
},
schedule="every 1 hours",
)
transfer_config = transfer_client.create_transfer_config(
bigquery_datatransfer.CreateTransferConfigRequest(
parent=f"projects/{project_id}/locations/us", # Just used 'us' as location for example. Just use your actual location so you won't encounter errors.
transfer_config=transfer_config,
service_account_name=gccreds['client_email'],
)
)
测试运行:
用于测试的查询:
project_id = 'my-project-id'
dataset_id = 'test_dataset'
bucket_name = 'my-bucket'
sql = f"""
EXPORT DATA OPTIONS(
uri='gs://{bucket_name}/export/*.json.gz',
format='JSON',
compression='GZIP',
overwrite=false
) AS
SELECT key,name FROM `{project_id}.{dataset_id}.source_t`
"""