从 BQ 到云的数据传输 SQL
Data Transfer from BQ To CLoud SQL
每天将所有记录从 BigQuery table 传输到云 SQL table 的最佳方法是什么(预计每天大约记录数超过 255801312 [2.55 亿] ).我知道我们可以创建从 BQ 到 CloudSQL 的数据流管道,但如此大量的数据将 运行 持续数小时。在 google 云中实施的任何最佳解决方案?
这里是工作流的工作示例。您需要为您的工作流服务帐户(cloudsql admin、bigquery dataviewer + job user、cloud storage admin)提供足够的权限,并且 table 必须存在于您的 Cloud SQL 实例中(我使用 [=20 进行了测试=]).
这篇文章正在烹饪,其中有更多细节。 替换存储桶、projectid、Cloud SQL 实例名称(mysql 在我的例子中)、查询、table 名称、数据库模式
main:
steps:
- assignStep:
assign:
- bucket: "TODO"
- projectid: "TODO"
- prefix: "workflow-import/export"
- listResult:
nextPageToken: ""
- export-query:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${projectid}
body:
query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS SELECT id, email FROM `copy_dataset.name_test`"}
useLegacySql: false
- importfiles:
call: import_files
args:
pagetoken: ${listResult.nextPageToken}
bucket: ${bucket}
prefix: ${prefix}
projectid: ${projectid}
result: listResult
- missing-files:
switch:
- condition: ${"nextPageToken" in listResult}
next: importfiles
import_files:
params:
- pagetoken
- bucket
- prefix
- projectid
steps:
- list-files:
call: googleapis.storage.v1.objects.list
args:
bucket: ${bucket}
pageToken: ${pagetoken}
prefix: ${prefix}
result: listResult
- process-files:
for:
value: file
in: ${listResult.items}
steps:
- wait-import:
call: load_file
args:
projectid: ${projectid}
importrequest:
importContext:
uri: ${"gs://" + bucket + "/" + file.name}
database: "test_schema"
fileType: CSV
csvImportOptions:
table: "workflowimport"
- return-step:
return: ${listResult}
load_file:
params: [importrequest,projectid]
steps:
- callImport:
call: http.post
args:
url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/mysql/import"}
auth:
type: OAuth2
body: ${importrequest}
result: operation
- chekoperation:
switch:
- condition: ${operation.body.status != "DONE"}
next: wait
next: completed
- completed:
return: "done"
- wait:
call: sys.sleep
args:
seconds: 5
next: getoperation
- getoperation:
call: http.get
args:
url: ${operation.body.selfLink}
auth:
type: OAuth2
result: operation
next: chekoperation
中有更多详细信息
每天将所有记录从 BigQuery table 传输到云 SQL table 的最佳方法是什么(预计每天大约记录数超过 255801312 [2.55 亿] ).我知道我们可以创建从 BQ 到 CloudSQL 的数据流管道,但如此大量的数据将 运行 持续数小时。在 google 云中实施的任何最佳解决方案?
这里是工作流的工作示例。您需要为您的工作流服务帐户(cloudsql admin、bigquery dataviewer + job user、cloud storage admin)提供足够的权限,并且 table 必须存在于您的 Cloud SQL 实例中(我使用 [=20 进行了测试=]).
这篇文章正在烹饪,其中有更多细节。 替换存储桶、projectid、Cloud SQL 实例名称(mysql 在我的例子中)、查询、table 名称、数据库模式
main:
steps:
- assignStep:
assign:
- bucket: "TODO"
- projectid: "TODO"
- prefix: "workflow-import/export"
- listResult:
nextPageToken: ""
- export-query:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${projectid}
body:
query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS SELECT id, email FROM `copy_dataset.name_test`"}
useLegacySql: false
- importfiles:
call: import_files
args:
pagetoken: ${listResult.nextPageToken}
bucket: ${bucket}
prefix: ${prefix}
projectid: ${projectid}
result: listResult
- missing-files:
switch:
- condition: ${"nextPageToken" in listResult}
next: importfiles
import_files:
params:
- pagetoken
- bucket
- prefix
- projectid
steps:
- list-files:
call: googleapis.storage.v1.objects.list
args:
bucket: ${bucket}
pageToken: ${pagetoken}
prefix: ${prefix}
result: listResult
- process-files:
for:
value: file
in: ${listResult.items}
steps:
- wait-import:
call: load_file
args:
projectid: ${projectid}
importrequest:
importContext:
uri: ${"gs://" + bucket + "/" + file.name}
database: "test_schema"
fileType: CSV
csvImportOptions:
table: "workflowimport"
- return-step:
return: ${listResult}
load_file:
params: [importrequest,projectid]
steps:
- callImport:
call: http.post
args:
url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/mysql/import"}
auth:
type: OAuth2
body: ${importrequest}
result: operation
- chekoperation:
switch:
- condition: ${operation.body.status != "DONE"}
next: wait
next: completed
- completed:
return: "done"
- wait:
call: sys.sleep
args:
seconds: 5
next: getoperation
- getoperation:
call: http.get
args:
url: ${operation.body.selfLink}
auth:
type: OAuth2
result: operation
next: chekoperation
中有更多详细信息