向 PCollection 添加增量索引?

Add incremental index to a PCollection?

我有一个 CSV,我用它创建了一个 PCollection (Apache Beam Python)。是否可以为 PCollection 的每个元素添加一个增量 ID?

pcoll = ["Sangeeta,24,Kolkata", "Akshay,26,Delhi", "Sahil,26,Kolkata"]

而我想要的是:

pcoll = [ (1, "Sangeeta,24,Kolkata"), (2, "Akshay,26,Delhi"), (3, "Sahil,26,Kolkata")]

很抱歉提出这样一个基本问题,但我对 Apache Beam 的经验很少。

您可以使用 beam.combiners.ToList() 来处理每个元素的 pcoll。使用 enumerate() 添加增量 ID,但这将从 0 开始,因为这是 python.

中索引的默认行为
from apache_beam.options.pipeline_options import PipelineOptions

beam_options = PipelineOptions(
    runner='DirectRunner',
)

p = beam.Pipeline(options=beam_options)

process = (p | beam.Create(['Sangeeta,24,Kolkata', 'Akshay,26,Delhi', 'Sahil,26,Kolkata']) 
           | 'Combine' >> beam.combiners.ToList()
           | 'Manipulate' >> beam.Map(lambda my_seq: [(elem) for elem in enumerate(my_seq)])
           | 'Print' >> beam.Map(print)
          )

result = p.run()

上面的代码将产生以下输出:

Beam 和 PCollections 的主要目的是实现并行处理。在每个元素上放置索引本质上不是平行的。您可以在 Beam 中进行 non-parallel 处理(如其他答案所示),但这不会扩展到更大的数据集,您实际上并不需要 Beam 来执行此操作。

我建议您退一步回到您要解决的问题 - 为什么您需要没有间隙的数字索引?可能有不同的方法来并行解决它。