向 PCollection 添加增量索引?
Add incremental index to a PCollection?
我有一个 CSV,我用它创建了一个 PCollection (Apache Beam Python)。是否可以为 PCollection 的每个元素添加一个增量 ID?
pcoll = ["Sangeeta,24,Kolkata", "Akshay,26,Delhi", "Sahil,26,Kolkata"]
而我想要的是:
pcoll = [ (1, "Sangeeta,24,Kolkata"), (2, "Akshay,26,Delhi"), (3, "Sahil,26,Kolkata")]
很抱歉提出这样一个基本问题,但我对 Apache Beam 的经验很少。
您可以使用 beam.combiners.ToList()
来处理每个元素的 pcoll
。使用 enumerate()
添加增量 ID,但这将从 0 开始,因为这是 python.
中索引的默认行为
from apache_beam.options.pipeline_options import PipelineOptions
beam_options = PipelineOptions(
runner='DirectRunner',
)
p = beam.Pipeline(options=beam_options)
process = (p | beam.Create(['Sangeeta,24,Kolkata', 'Akshay,26,Delhi', 'Sahil,26,Kolkata'])
| 'Combine' >> beam.combiners.ToList()
| 'Manipulate' >> beam.Map(lambda my_seq: [(elem) for elem in enumerate(my_seq)])
| 'Print' >> beam.Map(print)
)
result = p.run()
上面的代码将产生以下输出:
Beam 和 PCollections 的主要目的是实现并行处理。在每个元素上放置索引本质上不是平行的。您可以在 Beam 中进行 non-parallel 处理(如其他答案所示),但这不会扩展到更大的数据集,您实际上并不需要 Beam 来执行此操作。
我建议您退一步回到您要解决的问题 - 为什么您需要没有间隙的数字索引?可能有不同的方法来并行解决它。
我有一个 CSV,我用它创建了一个 PCollection (Apache Beam Python)。是否可以为 PCollection 的每个元素添加一个增量 ID?
pcoll = ["Sangeeta,24,Kolkata", "Akshay,26,Delhi", "Sahil,26,Kolkata"]
而我想要的是:
pcoll = [ (1, "Sangeeta,24,Kolkata"), (2, "Akshay,26,Delhi"), (3, "Sahil,26,Kolkata")]
很抱歉提出这样一个基本问题,但我对 Apache Beam 的经验很少。
您可以使用 beam.combiners.ToList()
来处理每个元素的 pcoll
。使用 enumerate()
添加增量 ID,但这将从 0 开始,因为这是 python.
from apache_beam.options.pipeline_options import PipelineOptions
beam_options = PipelineOptions(
runner='DirectRunner',
)
p = beam.Pipeline(options=beam_options)
process = (p | beam.Create(['Sangeeta,24,Kolkata', 'Akshay,26,Delhi', 'Sahil,26,Kolkata'])
| 'Combine' >> beam.combiners.ToList()
| 'Manipulate' >> beam.Map(lambda my_seq: [(elem) for elem in enumerate(my_seq)])
| 'Print' >> beam.Map(print)
)
result = p.run()
上面的代码将产生以下输出:
Beam 和 PCollections 的主要目的是实现并行处理。在每个元素上放置索引本质上不是平行的。您可以在 Beam 中进行 non-parallel 处理(如其他答案所示),但这不会扩展到更大的数据集,您实际上并不需要 Beam 来执行此操作。
我建议您退一步回到您要解决的问题 - 为什么您需要没有间隙的数字索引?可能有不同的方法来并行解决它。