将数据从 BigQuery 导出到 GCS - 可以部分传输吗?
Exporting data from BigQuery to GCS - Partial transfer possible?
我目前正在将我的数据(从 Bigquery 中的目标 table)导出到 GCS 中的存储桶。使用 Bigquery API 以编程方式执行此操作。
将数据从 Bigquery 导出到 GCS 时存在限制 - 数据不应大于 1GB。
- 由于我在目标 table 中的数据超过 1GB,我将文件分成多个部分。
- 文件将被拆分的部分数量显然取决于我在目标中的数据大小table。
这是发生这种情况的函数 exportDataToGCS() 的代码片段:
http = authorize();
bigquery_service = build('bigquery', 'v2', http=http)
query_request = bigquery_service.jobs()
DESTINATION_PATH = constants.GCS_BUCKET_PATH + canonicalDate + '/'
query_data = {
'projectId': 'ga-cnqr',
'configuration': {
'extract': {
'sourceTable': {
'projectId': constants.PROJECT_ID,
'datasetId': constants.DEST_TABLES_DATASET_ID,
'tableId': canonicalDate,
},
'destinationUris': [DESTINATION_PATH + canonicalDate + '-*.gz'],
'destinationFormat': 'CSV',
'printHeader': 'false',
'compression': 'GZIP'
}
}
}
query_response = query_request.insert(projectId=constants.PROJECT_NUMBER,
body=query_data).execute()
执行此函数后,在我的 GCS 存储桶中,我的文件以以下方式显示:
但是,我想知道是否存在文件应该被拆分为 10 个部分,但由于上述功能失败而只有 3 个部分进入存储桶的情况。
也就是说,是否可以部分导出?
网络中断或进程运行函数被杀死等原因会导致这种情况吗?这个过程是阻塞调用吗?异步?
提前致谢。
更新 1:查询响应中的状态参数
这就是我检查完成状态的方式。
while True:
status = query_request.get(projectId=constants.PROJECT_NUMBER, jobId=query_response['jobReference']['jobId']).execute()
if 'DONE' == status['status']['state']:
logging.info("Finished exporting for the date : " + stringCanonicalDate);
return
如果作业由于某种原因在执行中途失败,则可以部分导出。
如果作业处于完成状态,并且作业中没有错误,那么所有数据都已导出。
我建议在轮询完成的作业之前稍等片刻——如果轮询速度太快,可能会遇到速率限制错误,并且考虑到这些异步作业速度不快,因此不需要毫秒精度。
使用您的示例代码,您可以通过以下方式测试是否存在错误:
while True:
status = query_request.get(projectId=constants.PROJECT_NUMBER, jobId=query_response['jobReference']['jobId']).execute()
if 'DONE' == status['status']['state']:
if 'errorResult' in status['status']:
logging.error("Error exporting for the date : " + stringCanonicalDate);
logging.error(status['status']['errorResult'])
return False
logging.info("Finished exporting for the date : " + stringCanonicalDate);
return True
time.sleep(1)
为了超级健壮,您还可以捕获在轮询等待循环中偶尔发生的 HTTP 错误。看起来您正在使用 python API 客户端,它会在此类失败时引发 apiclient.errors.HttpError
。
我目前正在将我的数据(从 Bigquery 中的目标 table)导出到 GCS 中的存储桶。使用 Bigquery API 以编程方式执行此操作。
将数据从 Bigquery 导出到 GCS 时存在限制 - 数据不应大于 1GB。
- 由于我在目标 table 中的数据超过 1GB,我将文件分成多个部分。
- 文件将被拆分的部分数量显然取决于我在目标中的数据大小table。
这是发生这种情况的函数 exportDataToGCS() 的代码片段:
http = authorize();
bigquery_service = build('bigquery', 'v2', http=http)
query_request = bigquery_service.jobs()
DESTINATION_PATH = constants.GCS_BUCKET_PATH + canonicalDate + '/'
query_data = {
'projectId': 'ga-cnqr',
'configuration': {
'extract': {
'sourceTable': {
'projectId': constants.PROJECT_ID,
'datasetId': constants.DEST_TABLES_DATASET_ID,
'tableId': canonicalDate,
},
'destinationUris': [DESTINATION_PATH + canonicalDate + '-*.gz'],
'destinationFormat': 'CSV',
'printHeader': 'false',
'compression': 'GZIP'
}
}
}
query_response = query_request.insert(projectId=constants.PROJECT_NUMBER,
body=query_data).execute()
执行此函数后,在我的 GCS 存储桶中,我的文件以以下方式显示:
但是,我想知道是否存在文件应该被拆分为 10 个部分,但由于上述功能失败而只有 3 个部分进入存储桶的情况。
也就是说,是否可以部分导出?
网络中断或进程运行函数被杀死等原因会导致这种情况吗?这个过程是阻塞调用吗?异步?
提前致谢。
更新 1:查询响应中的状态参数
这就是我检查完成状态的方式。
while True:
status = query_request.get(projectId=constants.PROJECT_NUMBER, jobId=query_response['jobReference']['jobId']).execute()
if 'DONE' == status['status']['state']:
logging.info("Finished exporting for the date : " + stringCanonicalDate);
return
如果作业由于某种原因在执行中途失败,则可以部分导出。
如果作业处于完成状态,并且作业中没有错误,那么所有数据都已导出。
我建议在轮询完成的作业之前稍等片刻——如果轮询速度太快,可能会遇到速率限制错误,并且考虑到这些异步作业速度不快,因此不需要毫秒精度。
使用您的示例代码,您可以通过以下方式测试是否存在错误:
while True:
status = query_request.get(projectId=constants.PROJECT_NUMBER, jobId=query_response['jobReference']['jobId']).execute()
if 'DONE' == status['status']['state']:
if 'errorResult' in status['status']:
logging.error("Error exporting for the date : " + stringCanonicalDate);
logging.error(status['status']['errorResult'])
return False
logging.info("Finished exporting for the date : " + stringCanonicalDate);
return True
time.sleep(1)
为了超级健壮,您还可以捕获在轮询等待循环中偶尔发生的 HTTP 错误。看起来您正在使用 python API 客户端,它会在此类失败时引发 apiclient.errors.HttpError
。