Beam/Dataflow 读取 Parquet 文件并将文件 name/path 添加到每条记录
Beam/Dataflow read Parquet files and add file name/path to each record
我正在使用 Apache Beam Python SDK,我正在尝试使用 apache_beam.io.parquetio
从 Parquet 文件中读取数据,但我还想将文件名(或路径)添加到数据中因为它也包含数据。我查看了建议的模式 here 并了解到 Parquetio 与 fileio 类似,但它似乎没有实现允许遍历文件并将其添加到聚会的功能。
有人想出了实现这个的好方法吗?
谢谢!
如果文件数量不是很大,可以先获取所有文件再通过IO读取。
import glob
filelist = glob.glob('/tmp/*.parquet')
p = beam.Pipeline()
class PairWithFile(beam.DoFn):
def __init__(self, filename):
self._filename = filename
def process(self, e):
yield (self._filename, e)
file_with_records = [
(p
| 'Read %s' % (file) >> beam.io.ReadFromParquet(file)
| 'Pair %s' % (file) >> beam.ParDo(PairWithFile(file)))
for file in filelist
] | beam.Flatten()
然后你的 PCollection 看起来像这样:
我正在使用 Apache Beam Python SDK,我正在尝试使用 apache_beam.io.parquetio
从 Parquet 文件中读取数据,但我还想将文件名(或路径)添加到数据中因为它也包含数据。我查看了建议的模式 here 并了解到 Parquetio 与 fileio 类似,但它似乎没有实现允许遍历文件并将其添加到聚会的功能。
有人想出了实现这个的好方法吗?
谢谢!
如果文件数量不是很大,可以先获取所有文件再通过IO读取。
import glob
filelist = glob.glob('/tmp/*.parquet')
p = beam.Pipeline()
class PairWithFile(beam.DoFn):
def __init__(self, filename):
self._filename = filename
def process(self, e):
yield (self._filename, e)
file_with_records = [
(p
| 'Read %s' % (file) >> beam.io.ReadFromParquet(file)
| 'Pair %s' % (file) >> beam.ParDo(PairWithFile(file)))
for file in filelist
] | beam.Flatten()
然后你的 PCollection 看起来像这样: