在不知道 header 字段名的情况下将 csv 转换为 Apache Beam 中的字典

Convert csv to dict in Apache Beam without knowing header fieldnames

给定

的输入 CSV(存储在本地或 Google 云存储中)
a,b,c
1,2,3
4,5,6

如何获得具有价值的 PCollection

{a: 1, b: 2, c: 3}
{a: 4, b: 5, c: 6}

事先不知道 CSV 的名称 headers?

这里有两个选项。

(1) 您可以使用 beam.io.ReadFromText 跳过 header,然后使用 beam.Map(lambda line: zip(header_names, line.split(','))。这不会处理引用等(尽管可以进行调整,可能使用 csv 模块,但处理 multi-line 行将不适用于此方法)。

(2) 您可以使用 Beam dataframes API 来执行此操作,例如

from apache_beam.dataframe.io import read_csv

with beam.Pipeline as p:
    df = p | beam.dataframe.io.read_csv("/path/to/filepattern")
    # Here you can use df as if it were a Pandas dataframe,
    # or you can convert it into a PCollection of dicts with
    # pcoll = beam.dataframe.convert.to_pcollection(df)