从 python 脚本创建和暂存数据流模板
Creating and staging dataflow templates from python script
我是数据流的新手,在从我的 python 脚本创建模板时遇到了一些问题。我的脚本仅用于测试,我从存储桶中读取文件并将其再次写入存储桶。所以一个非常简单的脚本。但是当我 运行 创建数据流模板的命令时,我没有得到任何模板。这是我的命令:
python test.py \
--runner DataflowRunner \
--project my_gcp_project_id \
--staging_location gs://mybucket/staging \
--temp_location gs://mybucket/temp \
--output gs://mybucket/output \
--input gs://mybucket/input.txt\
--template_location gs://mybucket/templates/mytemplate
如何获取template_location
中的模板?谢谢
我找到了我错过的东西。所以当我在我的代码中定义管道选项时,我只输入和输出并且没有考虑 template_location 选项。所以我所做的就是将它添加到我的代码中的管道选项中,如下所示:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = {
'project': my_gcp_project_id ,
'staging_location': 'gs://' + mybucket+ '/staging',
'runner': 'DataflowRunner',
'job_name': job + '-vony',
'output': 'gs://'+mybucket+ '/output',
'input': 'gs://'+mybucket+'/input.txt',
'temp_location': 'gs://' + mybucket+ '/temp',
'template_location': 'gs://' + mybucket+ '/templates/' + myproject_name+ '-vony_tmpl'}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p = beam.Pipeline(options=pipeline_options)
我是数据流的新手,在从我的 python 脚本创建模板时遇到了一些问题。我的脚本仅用于测试,我从存储桶中读取文件并将其再次写入存储桶。所以一个非常简单的脚本。但是当我 运行 创建数据流模板的命令时,我没有得到任何模板。这是我的命令:
python test.py \
--runner DataflowRunner \
--project my_gcp_project_id \
--staging_location gs://mybucket/staging \
--temp_location gs://mybucket/temp \
--output gs://mybucket/output \
--input gs://mybucket/input.txt\
--template_location gs://mybucket/templates/mytemplate
如何获取template_location
中的模板?谢谢
我找到了我错过的东西。所以当我在我的代码中定义管道选项时,我只输入和输出并且没有考虑 template_location 选项。所以我所做的就是将它添加到我的代码中的管道选项中,如下所示:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = {
'project': my_gcp_project_id ,
'staging_location': 'gs://' + mybucket+ '/staging',
'runner': 'DataflowRunner',
'job_name': job + '-vony',
'output': 'gs://'+mybucket+ '/output',
'input': 'gs://'+mybucket+'/input.txt',
'temp_location': 'gs://' + mybucket+ '/temp',
'template_location': 'gs://' + mybucket+ '/templates/' + myproject_name+ '-vony_tmpl'}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p = beam.Pipeline(options=pipeline_options)