Apache beam 读取 csv 文件和 groupbykey
Apache beam read csv file and groupbykey
我有一个 csv 文件,我知道如何使用 pandas
实现此目的,基本上将 csv 读取为 df
-> 按字段分组数据 `'aaa', ' bbb'然后构造一个新的'id'.
我的问题是如何使用 Apache Beam
实现相同的效果,我以前从未使用过它,我正在尝试使用 Beam 读取此 csv 文件并对多条记录进行分组,但完全相同我使用 pandas 的功能不支持 Beam,以下是我当前的代码:
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
pipeline = beam.Pipeline()
csv_lines = (pipeline | 'ReadFile' >> beam.io.ReadFromText('xxx.csv')
| ???? )
我的问题是:
- 使用
beam.io.ReadFromText()
后没有header时如何进行数据操作
- 用 Beam 实现我上面描述的相同功能的最佳方法是什么(将多条记录分组并构造一个新 ID,然后将其转换为 json)
希望这是有道理的,我是 Beam 的新手,我们将不胜感激。谢谢。
如果你能用 Pandas 做到这一点,你应该也能以完全相同的方式用 Beam Dataframes API 做到这一点。
要直接使用 Beam 执行此操作,您可以将行映射到 2 元组,其中第一个元素是键(在您的情况下为 'postcode', 'property_type','old/new','PAON'
),然后使用 GroupByKey 进行分组一起。例如,假设您的元素是具有上述键的字典,即您的输入 PCollection 的元素是字典
{'postcode': ..., 'property_type': ..., 'old/new': ..., 'PAON': ..., ...}
然后你可以写
def get_key(property_dict):
return (
property_dict['postcode'],
element['property_type'],
element['old/new'],
element['PAON'])
grouped = (
input_pcoll
| beam.Map(lambda element: (get_key(element), element))
| beam.GroupByKey())
我有一个 csv 文件,我知道如何使用 pandas
实现此目的,基本上将 csv 读取为 df
-> 按字段分组数据 `'aaa', ' bbb'然后构造一个新的'id'.
我的问题是如何使用 Apache Beam
实现相同的效果,我以前从未使用过它,我正在尝试使用 Beam 读取此 csv 文件并对多条记录进行分组,但完全相同我使用 pandas 的功能不支持 Beam,以下是我当前的代码:
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
pipeline = beam.Pipeline()
csv_lines = (pipeline | 'ReadFile' >> beam.io.ReadFromText('xxx.csv')
| ???? )
我的问题是:
- 使用
beam.io.ReadFromText()
后没有header时如何进行数据操作
- 用 Beam 实现我上面描述的相同功能的最佳方法是什么(将多条记录分组并构造一个新 ID,然后将其转换为 json)
希望这是有道理的,我是 Beam 的新手,我们将不胜感激。谢谢。
如果你能用 Pandas 做到这一点,你应该也能以完全相同的方式用 Beam Dataframes API 做到这一点。
要直接使用 Beam 执行此操作,您可以将行映射到 2 元组,其中第一个元素是键(在您的情况下为 'postcode', 'property_type','old/new','PAON'
),然后使用 GroupByKey 进行分组一起。例如,假设您的元素是具有上述键的字典,即您的输入 PCollection 的元素是字典
{'postcode': ..., 'property_type': ..., 'old/new': ..., 'PAON': ..., ...}
然后你可以写
def get_key(property_dict):
return (
property_dict['postcode'],
element['property_type'],
element['old/new'],
element['PAON'])
grouped = (
input_pcoll
| beam.Map(lambda element: (get_key(element), element))
| beam.GroupByKey())