数据流模板功能的行为不一致?
Inconsistent behavior on the functioning of the dataflow templates?
当我创建数据流模板时,运行时参数的特征没有保存在模板文件中。
在运行时,如果我尝试为此参数传递一个值,我会收到 400 错误
我正在使用 Scio 0.3.2、scala 2.11.11 和 apache beam (0.6)。
我的参数如下:
trait XmlImportJobParameters extends PipelineOptions {
def getInput: ValueProvider[String]
def setInput(value: ValueProvider[String]): Unit
}
他们使用此代码注册
val options = PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation().as[XmlImportJobParameters](classOf[XmlImportJobParameters])
PipelineOptionsFactory.register(classOf[XmlImportJobParameters])
implicit val (sc, args) = ContextAndArgs(cmdlineArgs)
为了创建模板,我使用以下参数调用 sbt:
run-main jobs.XmlImportJob --runner=DataflowRunner --project=MyProject --templateLocation=gs://myBucket/XmlImportTemplate --tempLocation=gs://myBucket/staging --instance=myInstance
如果我显式传递 --input,它会变成 StaticValue 而不是 RuntimeValue,这一次,我可以在模板文件中看到它。
该模板是从 google 监视存储桶存储的函数调用的(灵感来自 https://shinesolutions.com/2017/03/23/triggering-dataflow-pipelines-with-cloud-functions/):
...
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {
input: `gs://${file.bucket}/${file.name}`
},
jobName: jobs[job].name,
gcsPath: 'gs://MyBucket/MyTemplate'
}
}
...
400 错误:
problem running dataflow template, error was: { Error: (109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: ['input' (perhaps you meant 'runner')] at Request._callback (/user_code/node_modules/googleapis/node_modules/google-auth-library/lib/transporters.js:85:15) at Request.self.callback (/user_code/node_modules/googleapis/node_modules/request/request.js:188:22) at emitTwo (events.js:106:13) at Request.emit (events.js:191:7) at Request.<anonymous(/user_code/node_modules/googleapis/node_modules/request/request.js:1171:10) at emitOne (events.js:96:13) at Request.emit (events.js:188:7) at IncomingMessage.<anonymous> (/user_code/node_modules/googleapis/node_modules/request/request.js:1091:12) at IncomingMessage.g (events.js:291:16) at emitNone (events.js:91:20) code: 400, errors: [ { message: '(109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: [\'input\' (perhaps you meant \'runner\')]', domain: 'global', reason: 'badRequest' } ] }
当我尝试这个时出现同样的错误:
gcloud beta dataflow jobs run xmlJobImport --gcs-location gs://MyBucket/MyTemplate --parameters input=gs://MyBucket/file.csv
=>
(gcloud.beta.dataflow.jobs.run) INVALID_ARGUMENT: (260a4f3f738a8ad9): The workflow could not be created. Causes: (260a4f3f738a8f96): Found unexpected parameters: ['input' (perhaps you meant 'runner'), 'projectid' (perhaps you meant 'project'), 'table' (perhaps you meant 'zone')]
当前设置为:
Current Settings:
appName: XmlImportJob$
autoscalingAlgorithm: THROUGHPUT_BASED
input: RuntimeValueProvider{propertyName=input, default=null, value=null}
instance: StaticValueProvider{value=staging}
jobName: xml-import-job
maxNumWorkers: 1
network: staging
numWorkers: 1
optionsId: 0
project: myProjectId
projectid: StaticValueProvider{value=myProjectId}
provenance: StaticValueProvider{value=ac3}
record: StaticValueProvider{value=BIEN}
root: StaticValueProvider{value=LISTEPA}
runner: class org.apache.beam.runners.dataflow.DataflowRunner
stableUniqueNames: WARNING
streaming: false
subnetwork: regions/europe-west1/subnetworks/net-staging
table: StaticValueProvider{value=annonce}
tempLocation: gs://-flux/staging/xmlImportJob/
templateLocation: gs://-flux-templates/XmlImportTemplate
workerMachineType: n1-standard-1
zone: europe-west1-c
环境
应对问题的答案:
Scio 当前不公开基于 ValueProvider
的 API - 我们现在有一个问题未解决 #696
一个有效的例子是这样的:
object WordCount {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.customInput("input", TextIO.read().from(sc.optionsAs[XmlImportJobParameters].getInput))
.map(_.toUpperCase)
.saveAsTextFile(args("output"))
sc.close()
}
}
对于上述作业,创建模板:
run-main com.example.WordCount --runner=DataflowRunner --project=<project> --templateLocation=gs://<template-bucket> --tempLocation=gs://<temp-location> --output=gs://<example-of-static-arg-output>
提交作业:
gcloud beta dataflow jobs run rav-test --gcs-location=gs://<template-bucket> --parameters=input=gs://<runtime-value>
当我创建数据流模板时,运行时参数的特征没有保存在模板文件中。 在运行时,如果我尝试为此参数传递一个值,我会收到 400 错误
我正在使用 Scio 0.3.2、scala 2.11.11 和 apache beam (0.6)。
我的参数如下:
trait XmlImportJobParameters extends PipelineOptions {
def getInput: ValueProvider[String]
def setInput(value: ValueProvider[String]): Unit
}
他们使用此代码注册
val options = PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation().as[XmlImportJobParameters](classOf[XmlImportJobParameters])
PipelineOptionsFactory.register(classOf[XmlImportJobParameters])
implicit val (sc, args) = ContextAndArgs(cmdlineArgs)
为了创建模板,我使用以下参数调用 sbt:
run-main jobs.XmlImportJob --runner=DataflowRunner --project=MyProject --templateLocation=gs://myBucket/XmlImportTemplate --tempLocation=gs://myBucket/staging --instance=myInstance
如果我显式传递 --input,它会变成 StaticValue 而不是 RuntimeValue,这一次,我可以在模板文件中看到它。
该模板是从 google 监视存储桶存储的函数调用的(灵感来自 https://shinesolutions.com/2017/03/23/triggering-dataflow-pipelines-with-cloud-functions/):
...
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {
input: `gs://${file.bucket}/${file.name}`
},
jobName: jobs[job].name,
gcsPath: 'gs://MyBucket/MyTemplate'
}
}
...
400 错误:
problem running dataflow template, error was: { Error: (109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: ['input' (perhaps you meant 'runner')] at Request._callback (/user_code/node_modules/googleapis/node_modules/google-auth-library/lib/transporters.js:85:15) at Request.self.callback (/user_code/node_modules/googleapis/node_modules/request/request.js:188:22) at emitTwo (events.js:106:13) at Request.emit (events.js:191:7) at Request.<anonymous(/user_code/node_modules/googleapis/node_modules/request/request.js:1171:10) at emitOne (events.js:96:13) at Request.emit (events.js:188:7) at IncomingMessage.<anonymous> (/user_code/node_modules/googleapis/node_modules/request/request.js:1091:12) at IncomingMessage.g (events.js:291:16) at emitNone (events.js:91:20) code: 400, errors: [ { message: '(109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: [\'input\' (perhaps you meant \'runner\')]', domain: 'global', reason: 'badRequest' } ] }
当我尝试这个时出现同样的错误:
gcloud beta dataflow jobs run xmlJobImport --gcs-location gs://MyBucket/MyTemplate --parameters input=gs://MyBucket/file.csv
=>
(gcloud.beta.dataflow.jobs.run) INVALID_ARGUMENT: (260a4f3f738a8ad9): The workflow could not be created. Causes: (260a4f3f738a8f96): Found unexpected parameters: ['input' (perhaps you meant 'runner'), 'projectid' (perhaps you meant 'project'), 'table' (perhaps you meant 'zone')]
当前设置为:
Current Settings:
appName: XmlImportJob$
autoscalingAlgorithm: THROUGHPUT_BASED
input: RuntimeValueProvider{propertyName=input, default=null, value=null}
instance: StaticValueProvider{value=staging}
jobName: xml-import-job
maxNumWorkers: 1
network: staging
numWorkers: 1
optionsId: 0
project: myProjectId
projectid: StaticValueProvider{value=myProjectId}
provenance: StaticValueProvider{value=ac3}
record: StaticValueProvider{value=BIEN}
root: StaticValueProvider{value=LISTEPA}
runner: class org.apache.beam.runners.dataflow.DataflowRunner
stableUniqueNames: WARNING
streaming: false
subnetwork: regions/europe-west1/subnetworks/net-staging
table: StaticValueProvider{value=annonce}
tempLocation: gs://-flux/staging/xmlImportJob/
templateLocation: gs://-flux-templates/XmlImportTemplate
workerMachineType: n1-standard-1
zone: europe-west1-c
环境
应对问题的答案:
Scio 当前不公开基于 ValueProvider
的 API - 我们现在有一个问题未解决 #696
一个有效的例子是这样的:
object WordCount {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.customInput("input", TextIO.read().from(sc.optionsAs[XmlImportJobParameters].getInput))
.map(_.toUpperCase)
.saveAsTextFile(args("output"))
sc.close()
}
}
对于上述作业,创建模板:
run-main com.example.WordCount --runner=DataflowRunner --project=<project> --templateLocation=gs://<template-bucket> --tempLocation=gs://<temp-location> --output=gs://<example-of-static-arg-output>
提交作业:
gcloud beta dataflow jobs run rav-test --gcs-location=gs://<template-bucket> --parameters=input=gs://<runtime-value>