数据流无法使用自定义模板解析模板文件
Dataflow unable to parse template file with custom template
我正在尝试 运行 数据流中的简单管道
import apache_beam as beam
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'dataflow-tutorial1'
gcloud_options.project = 'xxxx'
gcloud_options.staging_location = 'gs://xxxx/staging'
gcloud_options.temp_location = 'gs://xxxx/temp'
gcloud_options.service_account_email = 'dataflow@xxxx.iam.gserviceaccount.com'
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 20
worker_options.max_num_workers = 2
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
p1 = beam.Pipeline(options=options)
(p1 | 'Hello World' >> beam.Create(['Hello World']))
p1.run()
当我从数据流 UI 创建作业并尝试 运行 它时,我不断得到
Unable to parse template file 'gs://dataflow-sm/pipeline-files/read-write-to-gsc-file.py'.
如果我从终端运行得到
ERROR: (gcloud.dataflow.jobs.run) FAILED_PRECONDITION: Unable to parse template file 'gs://dataflow-sm/pipeline-files/read-write-to-gsc-file.py'.
- '@type': type.googleapis.com/google.rpc.PreconditionFailure
violations:
- description: "Unexpected end of stream : expected '{'"
subject: 0:0
type: JSON
知道这里可能有什么问题吗?
您缺少一个步骤:将您的 Python 代码转换为 JSON 模板。可以找到说明 here。对于 Python,特别是:
python read-write-to-gsc-file.py \
--runner DataflowRunner \
...
--template_location gs://dataflow-sm/pipeline-files/read-write-to-gsc-file
模板将在 --template_location
指定的 GCS 路径中暂存。以 Google 提供的 word count template 为例。
然后你可以execute模板提供--gcs-location
:
gcloud dataflow jobs run [JOB_NAME] \
--gcs-location gs://dataflow-sm/pipeline-files/read-write-to-gsc-file
我正在尝试 运行 数据流中的简单管道
import apache_beam as beam
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'dataflow-tutorial1'
gcloud_options.project = 'xxxx'
gcloud_options.staging_location = 'gs://xxxx/staging'
gcloud_options.temp_location = 'gs://xxxx/temp'
gcloud_options.service_account_email = 'dataflow@xxxx.iam.gserviceaccount.com'
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 20
worker_options.max_num_workers = 2
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
p1 = beam.Pipeline(options=options)
(p1 | 'Hello World' >> beam.Create(['Hello World']))
p1.run()
当我从数据流 UI 创建作业并尝试 运行 它时,我不断得到
Unable to parse template file 'gs://dataflow-sm/pipeline-files/read-write-to-gsc-file.py'.
如果我从终端运行得到
ERROR: (gcloud.dataflow.jobs.run) FAILED_PRECONDITION: Unable to parse template file 'gs://dataflow-sm/pipeline-files/read-write-to-gsc-file.py'.
- '@type': type.googleapis.com/google.rpc.PreconditionFailure
violations:
- description: "Unexpected end of stream : expected '{'"
subject: 0:0
type: JSON
知道这里可能有什么问题吗?
您缺少一个步骤:将您的 Python 代码转换为 JSON 模板。可以找到说明 here。对于 Python,特别是:
python read-write-to-gsc-file.py \
--runner DataflowRunner \
...
--template_location gs://dataflow-sm/pipeline-files/read-write-to-gsc-file
模板将在 --template_location
指定的 GCS 路径中暂存。以 Google 提供的 word count template 为例。
然后你可以execute模板提供--gcs-location
:
gcloud dataflow jobs run [JOB_NAME] \
--gcs-location gs://dataflow-sm/pipeline-files/read-write-to-gsc-file