将数据增量写入 parquet 文件
Write data incrementally to a parquet file
要从 pandas 数据帧写入镶木地板,我正在执行以下操作:
df = pd.DataFrame(DATA)
table = pa.Table.from_pandas(df)
pq.write_table(table, 'DATA.parquet')
但是,如果我假设有 1B 行,它就不能很好地工作,而且它无法放入内存。在那种情况下,我将如何增量写入数据。例如,类似于:
DATA = []
BACTCH_SIZE = 10000
with open('largefile.csv') as f:
for num, line in enumerate(f):
if (len(DATA) == BATCH_SIZE):
pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.parquet')
DATA = []
DATA.append(line.split(','))
if DATA: pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.parquet')
但是,我相信上面的内容只会继续覆盖 parquet 文件。我怎样才能做相当于追加的事情?
Hadoop 不适用于追加。只需将每批新文件写入单个目录,几乎所有 Hadoop API 都应该能够读取所有 parquet 文件
BACTCH_SIZE = 10000
c = 0
with open('largefile.csv') as f:
for num, line in enumerate(f):
if len(DATA) == BATCH_SIZE:
pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.{}.parquet'.format(c))
DATA = []
c += 1
DATA.append(line.split(','))
Spark也是这样写数据的;每个执行者一个文件
但是如果你有一个大的 csv,只需将它放在 HDFS 中,然后在其上创建一个 Hive table,然后从那里将其转换为 parquet。根本不需要 pandas
要从 pandas 数据帧写入镶木地板,我正在执行以下操作:
df = pd.DataFrame(DATA)
table = pa.Table.from_pandas(df)
pq.write_table(table, 'DATA.parquet')
但是,如果我假设有 1B 行,它就不能很好地工作,而且它无法放入内存。在那种情况下,我将如何增量写入数据。例如,类似于:
DATA = []
BACTCH_SIZE = 10000
with open('largefile.csv') as f:
for num, line in enumerate(f):
if (len(DATA) == BATCH_SIZE):
pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.parquet')
DATA = []
DATA.append(line.split(','))
if DATA: pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.parquet')
但是,我相信上面的内容只会继续覆盖 parquet 文件。我怎样才能做相当于追加的事情?
Hadoop 不适用于追加。只需将每批新文件写入单个目录,几乎所有 Hadoop API 都应该能够读取所有 parquet 文件
BACTCH_SIZE = 10000
c = 0
with open('largefile.csv') as f:
for num, line in enumerate(f):
if len(DATA) == BATCH_SIZE:
pq.write_table(pa.Table.from_pandas(pd.DataFrame(DATA)), 'DATA.{}.parquet'.format(c))
DATA = []
c += 1
DATA.append(line.split(','))
Spark也是这样写数据的;每个执行者一个文件
但是如果你有一个大的 csv,只需将它放在 HDFS 中,然后在其上创建一个 Hive table,然后从那里将其转换为 parquet。根本不需要 pandas