JOB 到 Dataflow 使用函数

JOB to Dataflow using function

参数google函数

我想传递参数给查询

模板文件:

"steps":[
      {
         "kind":"ParallelRead",
         "name":"s1",
         "properties":{
            "bigquery_export_format":"FORMAT_AVRO",
            "bigquery_flatten_results":true,
            "bigquery_query":"select * from `myproject2497.teste.teste`",
            "bigquery_use_legacy_sql":false,
            "display_data":[
               {
                  "key":"source",
                  "label":"Read Source",
                  "namespace":"apache_beam.io.iobase.Read",
                  "shortValue":"BigQuerySource",
                  "type":"STRING",
                  "value":"apache_beam.io.gcp.bigquery.BigQuerySource"
               },
               {
                  "key":"query",
                  "label":"Query",
                  "namespace":"apache_beam.io.gcp.bigquery.BigQuerySource",
                  "type":"STRING",
                  "value":"select * from `myproject2497.teste.teste limit 1`"
               },

函数google云: blob.upload_from_string(时间戳)

parameters = {"bql": bql}
jobname = "poc"    
gcsPath="gs://exemplebucket1321/teste/templates/Bee"
body = {
    "jobName": "{jobname}".format(jobname=jobname),
    "parameters": parameters
}

请参阅创建文档:

https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

和运行模板:

https://cloud.google.com/dataflow/docs/guides/templates/running-templates

请注意,您只会在已经使用 ValueProvider 的模板代码中传入参数(无需更改源)。请参阅下面的 link,了解带有 ValueProvider 选项的 IOs 列表。对于 BigQuery,您需要 Java SDK 2.0+

https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters

由于您使用 BigQuery 作为来源,我相信按照以上两个 link 中的说明将解决您的问题