Apache beam 读取 csv 文件和 groupbykey

Apache beam read csv file and groupbykey

我有一个 csv 文件,我知道如何使用 pandas 实现此目的,基本上将 csv 读取为 df -> 按字段分组数据 `'aaa', ' bbb'然后构造一个新的'id'.

我的问题是如何使用 Apache Beam 实现相同的效果,我以前从未使用过它,我正在尝试使用 Beam 读取此 csv 文件并对多条记录进行分组,但完全相同我使用 pandas 的功能不支持 Beam,以下是我当前的代码:

import apache_beam as beam
from apache_beam.dataframe.io import read_csv

pipeline = beam.Pipeline()
csv_lines = (pipeline | 'ReadFile' >> beam.io.ReadFromText('xxx.csv')
| ???? )

我的问题是:

希望这是有道理的,我是 Beam 的新手,我们将不胜感激。谢谢。

如果你能用 Pandas 做到这一点,你应该也能以完全相同的方式用 Beam Dataframes API 做到这一点。

要直接使用 Beam 执行此操作,您可以将行映射到 2 元组,其中第一个元素是键(在您的情况下为 'postcode', 'property_type','old/new','PAON'),然后使用 GroupByKey 进行分组一起。例如,假设您的元素是具有上述键的字典,即您的输入 PCollection 的元素是字典

{'postcode': ..., 'property_type': ..., 'old/new': ..., 'PAON': ..., ...}

然后你可以写

def get_key(property_dict):
  return (
      property_dict['postcode'],
      element['property_type'], 
      element['old/new'], 
      element['PAON'])
      

grouped = (
    input_pcoll
    | beam.Map(lambda element: (get_key(element), element))
    | beam.GroupByKey())