Apache Beam 数据流管道中步骤的 If 语句 (python)
If statement for steps in a apache beam dataflow pipeline (python)
我想知道是否可以在 Beam 管道中使用 if 语句来根据不同的场景执行不同的转换。例如:
1) 使输入参数之一成为 backfill/regular,然后根据该输入参数决定是否从
开始
(p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))
或
p | beam.io.ReadFromText(known_args.input_file_name)
2) 如果文件名中包含某个国家名称(即美国),则调用TransformUSA(beam.DoFn)
,否则调用TransformAllCountries(beam.DoFn)
抱歉,如果这不是一个很好的问题,我在其他任何地方都没有看到过这个,我正在尝试使我的代码模块化而不是使用单独的管道
您的管道完全可以有一个 if 语句,但请记住,在管道构建时应该知道这些事情。因此,例如:
with beam.Pipeline(...) as p:
if known_args.backfill == True:
input_pcoll = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.read_utf8().split('\n'))
else:
input_pcoll = (p
| beam.io.ReadFromText(known_args.input_file_name)
然后,对于您的 TransformUSA
,您可以执行以下操作:
if 'USA' in known_args.input_file_name:
next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())
这有意义吗?
我想知道是否可以在 Beam 管道中使用 if 语句来根据不同的场景执行不同的转换。例如:
1) 使输入参数之一成为 backfill/regular,然后根据该输入参数决定是否从
开始(p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))
或
p | beam.io.ReadFromText(known_args.input_file_name)
2) 如果文件名中包含某个国家名称(即美国),则调用TransformUSA(beam.DoFn)
,否则调用TransformAllCountries(beam.DoFn)
抱歉,如果这不是一个很好的问题,我在其他任何地方都没有看到过这个,我正在尝试使我的代码模块化而不是使用单独的管道
您的管道完全可以有一个 if 语句,但请记住,在管道构建时应该知道这些事情。因此,例如:
with beam.Pipeline(...) as p:
if known_args.backfill == True:
input_pcoll = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.read_utf8().split('\n'))
else:
input_pcoll = (p
| beam.io.ReadFromText(known_args.input_file_name)
然后,对于您的 TransformUSA
,您可以执行以下操作:
if 'USA' in known_args.input_file_name:
next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())
这有意义吗?