运行 GCP 数据流上的脚本
Run script on GCP Dataflow
我开始尝试Google Cloud Dataflow,在经典的wordcount
例子之后,我写了我自己的脚本:
import argparse
import sys
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
class Split(beam.DoFn):
def process(self, element):
(numfact, bag, type, owner,
main_owner, client) = element.splt('\t')
return [{
'numfact': int(numfact),
'type': type,
'owner': owner
}]
parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, extra_args = parser.parse_known_args(sys.argv[1:])
options = PipelineOptions(extra_args)
p = beam.Pipeline(options=options)
print(known_args)
print(extra_args)
csv_lines = (p | "Load" >> ReadFromText(known_args.input, skip_header_lines=1) | "Process" >> beam.ParDo(Split()) | "Write" >> WriteToText(known_args.output))
这里是输入文件中的示例:
Numfact BAG TYPE OWNER MAIN OWNER CLIENT
728632636 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES TELETICKET Rimac
704845964 CNT Alternativos Kramer Ortiz SOAT Canal
701387639 CNT SIN ASIGNAR Sin asignar WEB VEHICULOS Canal
692571746 CNT Concesionarios Kramer Ortiz WEB VEHICULOS Canal
682823453 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823452 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823451 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823454 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
706853395 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES - WEB Canal
706466281 CNT Alternativos Kramer Ortiz SOAT Canal
最后我这样调用执行(文件保存为.txt):
python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1
在此之后,它会在控制台上显示打印,但不会在 DataFlow 控制台中注册执行。
更新
这是控制台的样子:
(gcp) gocht@~/script$ python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1
Namespace(input='gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt', output='gs://dummy_bucket/outputs') ['--runner', 'DataflowRunner', '--project', 'dummyproject-268120', '--temp_location', 'gs://dummy_bucket/tmp', '--region', 'us-central1']
这仅显示放置在代码脚本上的印刷品。
我错过了什么?
谢谢!
你需要
result = p.run()
在文件末尾到 运行 管道。
基本上我认为您已经构建了管道,但并没有真正要求 运行 它。
既然评论里有答案,也写在这里:)
您需要通过执行以下操作实际制作管道 运行:
p.run().wait_until_finish()
如果您感到卡住并且不确定哪里出了问题,请尝试查看提供的示例 here - java 版本在开始使用数据流时对我帮助很大 :)
我开始尝试Google Cloud Dataflow,在经典的wordcount
例子之后,我写了我自己的脚本:
import argparse
import sys
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
class Split(beam.DoFn):
def process(self, element):
(numfact, bag, type, owner,
main_owner, client) = element.splt('\t')
return [{
'numfact': int(numfact),
'type': type,
'owner': owner
}]
parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, extra_args = parser.parse_known_args(sys.argv[1:])
options = PipelineOptions(extra_args)
p = beam.Pipeline(options=options)
print(known_args)
print(extra_args)
csv_lines = (p | "Load" >> ReadFromText(known_args.input, skip_header_lines=1) | "Process" >> beam.ParDo(Split()) | "Write" >> WriteToText(known_args.output))
这里是输入文件中的示例:
Numfact BAG TYPE OWNER MAIN OWNER CLIENT
728632636 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES TELETICKET Rimac
704845964 CNT Alternativos Kramer Ortiz SOAT Canal
701387639 CNT SIN ASIGNAR Sin asignar WEB VEHICULOS Canal
692571746 CNT Concesionarios Kramer Ortiz WEB VEHICULOS Canal
682823453 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823452 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823451 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
682823454 CNT Alternativos Kramer Ortiz WEB VEHICULOS Canal
706853395 CNT Alternativos Kramer Ortiz ACCIDENTES PERSONALES - WEB Canal
706466281 CNT Alternativos Kramer Ortiz SOAT Canal
最后我这样调用执行(文件保存为.txt):
python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1
在此之后,它会在控制台上显示打印,但不会在 DataFlow 控制台中注册执行。
更新
这是控制台的样子:
(gcp) gocht@~/script$ python -m beam --input gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt --output gs://dummy_bucket/outputs --runner DataflowRunner --project dummyproject-268120 --temp_location gs://dummy_bucket/tmp --region us-central1
Namespace(input='gs://dummy_bucket/data_entry/pcd/pcd_ensure.txt', output='gs://dummy_bucket/outputs') ['--runner', 'DataflowRunner', '--project', 'dummyproject-268120', '--temp_location', 'gs://dummy_bucket/tmp', '--region', 'us-central1']
这仅显示放置在代码脚本上的印刷品。
我错过了什么?
谢谢!
你需要
result = p.run()
在文件末尾到 运行 管道。
基本上我认为您已经构建了管道,但并没有真正要求 运行 它。
既然评论里有答案,也写在这里:)
您需要通过执行以下操作实际制作管道 运行:
p.run().wait_until_finish()
如果您感到卡住并且不确定哪里出了问题,请尝试查看提供的示例 here - java 版本在开始使用数据流时对我帮助很大 :)