Apache Beam 数据流管道中步骤的 If 语句 (python)

If statement for steps in a apache beam dataflow pipeline (python)

我想知道是否可以在 Beam 管道中使用 if 语句来根据不同的场景执行不同的转换。例如:

1) 使输入参数之一成为 backfill/regular,然后根据该输入参数决定是否从

开始
(p 
            | fileio.MatchFiles(known_args.input_bucket)
            | fileio.ReadMatches()
            | beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))

p | beam.io.ReadFromText(known_args.input_file_name)

2) 如果文件名中包含某个国家名称(即美国),则调用TransformUSA(beam.DoFn),否则调用TransformAllCountries(beam.DoFn)

抱歉,如果这不是一个很好的问题,我在其他任何地方都没有看到过这个,我正在尝试使我的代码模块化而不是使用单独的管道

您的管道完全可以有一个 if 语句,但请记住,在管道构建时应该知道这些事情。因此,例如:

with beam.Pipeline(...) as p:
  if known_args.backfill == True:
    input_pcoll = (p
                   | fileio.MatchFiles(known_args.input_bucket)
                   | fileio.ReadMatches()
                   | beam.Map(lambda file: file.read_utf8().split('\n'))
  else:
    input_pcoll = (p
                   | beam.io.ReadFromText(known_args.input_file_name)

然后,对于您的 TransformUSA,您可以执行以下操作:

if 'USA' in known_args.input_file_name:
  next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
  next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())

这有意义吗?