数据流模板功能的行为不一致?

Inconsistent behavior on the functioning of the dataflow templates?

当我创建数据流模板时,运行时参数的特征没有保存在模板文件中。 在运行时,如果我尝试为此参数传递一个值,我会收到 400 错误

我正在使用 Scio 0.3.2、scala 2.11.11 和 apache beam (0.6)。

我的参数如下:

trait XmlImportJobParameters extends PipelineOptions {
  def getInput: ValueProvider[String]
  def setInput(value: ValueProvider[String]): Unit
}

他们使用此代码注册

val options = PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation().as[XmlImportJobParameters](classOf[XmlImportJobParameters])
PipelineOptionsFactory.register(classOf[XmlImportJobParameters])
implicit val (sc, args) = ContextAndArgs(cmdlineArgs)

为了创建模板,我使用以下参数调用 sbt:

run-main jobs.XmlImportJob    --runner=DataflowRunner --project=MyProject  --templateLocation=gs://myBucket/XmlImportTemplate  --tempLocation=gs://myBucket/staging --instance=myInstance

如果我显式传递 --input,它会变成 StaticValue 而不是 RuntimeValue,这一次,我可以在模板文件中看到它。

该模板是从 google 监视存储桶存储的函数调用的(灵感来自 https://shinesolutions.com/2017/03/23/triggering-dataflow-pipelines-with-cloud-functions/):

...
dataflow.projects.templates.create({
                projectId: projectId,
                resource: {
                    parameters: {
                        input: `gs://${file.bucket}/${file.name}`
                    },
                    jobName: jobs[job].name,
                    gcsPath: 'gs://MyBucket/MyTemplate'
                }
            }
...

400 错误:

problem running dataflow template, error was: { Error: (109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: ['input' (perhaps you meant 'runner')] at Request._callback (/user_code/node_modules/googleapis/node_modules/google-auth-library/lib/transporters.js:85:15) at Request.self.callback (/user_code/node_modules/googleapis/node_modules/request/request.js:188:22) at emitTwo (events.js:106:13) at Request.emit (events.js:191:7) at Request.<anonymous(/user_code/node_modules/googleapis/node_modules/request/request.js:1171:10) at emitOne (events.js:96:13) at Request.emit (events.js:188:7) at IncomingMessage.<anonymous> (/user_code/node_modules/googleapis/node_modules/request/request.js:1091:12) at IncomingMessage.g (events.js:291:16) at emitNone (events.js:91:20) code: 400, errors: [ { message: '(109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: [\'input\' (perhaps you meant \'runner\')]', domain: 'global', reason: 'badRequest' } ] }

当我尝试这个时出现同样的错误:

gcloud beta dataflow jobs run xmlJobImport --gcs-location gs://MyBucket/MyTemplate --parameters input=gs://MyBucket/file.csv

=>

(gcloud.beta.dataflow.jobs.run) INVALID_ARGUMENT: (260a4f3f738a8ad9): The workflow could not be created. Causes: (260a4f3f738a8f96): Found unexpected parameters: ['input' (perhaps you meant 'runner'), 'projectid' (perhaps you meant 'project'), 'table' (perhaps you meant 'zone')]

当前设置为:

Current Settings:
  appName: XmlImportJob$
  autoscalingAlgorithm: THROUGHPUT_BASED
  input: RuntimeValueProvider{propertyName=input, default=null, value=null}
  instance: StaticValueProvider{value=staging}
  jobName: xml-import-job
  maxNumWorkers: 1
  network: staging
  numWorkers: 1
  optionsId: 0
  project: myProjectId
  projectid: StaticValueProvider{value=myProjectId}
  provenance: StaticValueProvider{value=ac3}
  record: StaticValueProvider{value=BIEN}
  root: StaticValueProvider{value=LISTEPA}
  runner: class org.apache.beam.runners.dataflow.DataflowRunner
  stableUniqueNames: WARNING
  streaming: false
  subnetwork: regions/europe-west1/subnetworks/net-staging
  table: StaticValueProvider{value=annonce}
  tempLocation: gs://-flux/staging/xmlImportJob/
  templateLocation: gs://-flux-templates/XmlImportTemplate
  workerMachineType: n1-standard-1
  zone: europe-west1-c

环境

应对问题的答案:

Scio 当前不公开基于 ValueProvider 的 API - 我们现在有一个问题未解决 #696

一个有效的例子是这样的:

object WordCount {
   def main(cmdlineArgs: Array[String]): Unit = {
     val (sc, args) = ContextAndArgs(cmdlineArgs)
     sc.customInput("input", TextIO.read().from(sc.optionsAs[XmlImportJobParameters].getInput))
       .map(_.toUpperCase)
       .saveAsTextFile(args("output"))
     sc.close()
   }
}

对于上述作业,创建模板:

run-main com.example.WordCount --runner=DataflowRunner --project=<project> --templateLocation=gs://<template-bucket>  --tempLocation=gs://<temp-location> --output=gs://<example-of-static-arg-output>

提交作业:

gcloud beta dataflow jobs run rav-test --gcs-location=gs://<template-bucket> --parameters=input=gs://<runtime-value>