在 Python 中逐行写入镶木地板

Write to parquet row by row in Python

我在异步循环中获取消息,并从每条消息中解析 row 这是字典。我想将这些行写入镶木地板。为了实现这一点,我执行以下操作:

fields = [('A', pa.float64()), ('B', pa.float64()), ('C', pa.float64()), ('D', pa.float64())]
schema = pa.schema(fields)
pqwriter = pq.ParquetWriter('sample.parquet', schema=schema, compression='gzip')

#async cycle starts here
async for message in messages:
   row = {'A': message[1], 'B': message[2], 'C': message[3], 'D': message[4]}
   table = pa.Table.from_pydict(row)
   pqwriter.write_table(table)
#end of async cycle
pqwriter.close()

一切都很完美,但是生成的 parquet 文件大小约为 5 Mb,而如果我执行写入 csv 文件,我得到的文件大小约为 200 Kb。我检查过数据类型是否相同(csv 的列是 floatt,parquet 的列是 float)

为什么我的 parquet 比具有相同数据的 csv 大得多?

Parquet 是一种专为批量写入数据而优化的列式格式。它不是用来逐行写入数据的。

它不太适合您的用例。您可能希望以更合适的格式(例如 avro、csv)写入中间行数据,然后将数据批量转换为 parquet。

我达到了如下预期效果:

chunksize = 1e6
data = []
fields = #list of tuples
schema = pa.schema(fields)

with pq.ParquetWriter('my_parquet', schema=schema) as writer:
#async cycle starts here
rows = #dict with structure as in fields
data.extend(rows)

if len(data)>chunksize:
   data = pd.DataFrame(data)
   table = pa.Table.from_pandas(data, schema=schema)
   writer.write_table(table)
   data = []
#end of async cycle
if len(data)!=0:
   data = pd.DataFrame(data)
   table = pa.Table.from_pandas(data, schema=schema)
   writer.write_table(table)
writer.close()

这段代码片段确实满足了我的需要。