从 BQ 到云的数据传输 SQL

Data Transfer from BQ To CLoud SQL

每天将所有记录从 BigQuery table 传输到云 SQL table 的最佳方法是什么(预计每天大约记录数超过 255801312 [2.55 亿] ).我知道我们可以创建从 BQ 到 CloudSQL 的数据流管道,但如此大量的数据将 运行 持续数小时。在 google 云中实施的任何最佳解决方案?

这里是工作流的工作示例。您需要为您的工作流服务帐户(cloudsql admin、bigquery dataviewer + job user、cloud storage admin)提供足够的权限,并且 table 必须存在于您的 Cloud SQL 实例中(我使用 [=20 进行了测试=]).

这篇文章正在烹饪,其中有更多细节。 替换存储桶、projectid、Cloud SQL 实例名称(mysql 在我的例子中)、查询、table 名称、数据库模式

main:
  steps:
    - assignStep:
        assign:
          - bucket: "TODO"
          - projectid: "TODO"
          - prefix: "workflow-import/export"
          - listResult:
              nextPageToken: ""
    - export-query:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${projectid}
          body:
            query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS SELECT id, email FROM `copy_dataset.name_test`"}
            useLegacySql: false
    - importfiles:
        call: import_files
        args:
          pagetoken: ${listResult.nextPageToken}
          bucket: ${bucket}
          prefix: ${prefix}
          projectid: ${projectid}
        result: listResult
    - missing-files:
        switch:
          - condition:  ${"nextPageToken" in listResult}
            next: importfiles


import_files:
  params:
    - pagetoken
    - bucket
    - prefix
    - projectid
  steps:
    - list-files:
        call: googleapis.storage.v1.objects.list
        args:
          bucket: ${bucket}
          pageToken: ${pagetoken}
          prefix: ${prefix}
        result: listResult
    - process-files:
        for:
          value: file
          in: ${listResult.items}
          steps:
            - wait-import:
                call: load_file
                args:
                  projectid: ${projectid}
                  importrequest:
                    importContext:
                      uri: ${"gs://" + bucket + "/" + file.name}
                      database: "test_schema"
                      fileType: CSV
                      csvImportOptions:
                        table: "workflowimport"
    - return-step:
        return: ${listResult}


load_file:
  params: [importrequest,projectid]
  steps:
    - callImport:
        call: http.post
        args:
          url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/mysql/import"}
          auth:
            type: OAuth2
          body: ${importrequest}
        result: operation
    - chekoperation:
        switch:
          - condition: ${operation.body.status != "DONE"}
            next: wait
        next: completed
    - completed:
        return: "done"
    - wait:
        call: sys.sleep
        args:
          seconds: 5
        next: getoperation
    - getoperation:
        call: http.get
        args:
          url: ${operation.body.selfLink}
          auth:
            type: OAuth2
        result: operation
        next: chekoperation

my medium article

中有更多详细信息