在 Python 中逐行写入镶木地板
Write to parquet row by row in Python
我在异步循环中获取消息,并从每条消息中解析 row
这是字典。我想将这些行写入镶木地板。为了实现这一点,我执行以下操作:
fields = [('A', pa.float64()), ('B', pa.float64()), ('C', pa.float64()), ('D', pa.float64())]
schema = pa.schema(fields)
pqwriter = pq.ParquetWriter('sample.parquet', schema=schema, compression='gzip')
#async cycle starts here
async for message in messages:
row = {'A': message[1], 'B': message[2], 'C': message[3], 'D': message[4]}
table = pa.Table.from_pydict(row)
pqwriter.write_table(table)
#end of async cycle
pqwriter.close()
一切都很完美,但是生成的 parquet 文件大小约为 5 Mb,而如果我执行写入 csv 文件,我得到的文件大小约为 200 Kb。我检查过数据类型是否相同(csv 的列是 floatt,parquet 的列是 float)
为什么我的 parquet 比具有相同数据的 csv 大得多?
Parquet 是一种专为批量写入数据而优化的列式格式。它不是用来逐行写入数据的。
它不太适合您的用例。您可能希望以更合适的格式(例如 avro、csv)写入中间行数据,然后将数据批量转换为 parquet。
我达到了如下预期效果:
chunksize = 1e6
data = []
fields = #list of tuples
schema = pa.schema(fields)
with pq.ParquetWriter('my_parquet', schema=schema) as writer:
#async cycle starts here
rows = #dict with structure as in fields
data.extend(rows)
if len(data)>chunksize:
data = pd.DataFrame(data)
table = pa.Table.from_pandas(data, schema=schema)
writer.write_table(table)
data = []
#end of async cycle
if len(data)!=0:
data = pd.DataFrame(data)
table = pa.Table.from_pandas(data, schema=schema)
writer.write_table(table)
writer.close()
这段代码片段确实满足了我的需要。
我在异步循环中获取消息,并从每条消息中解析 row
这是字典。我想将这些行写入镶木地板。为了实现这一点,我执行以下操作:
fields = [('A', pa.float64()), ('B', pa.float64()), ('C', pa.float64()), ('D', pa.float64())]
schema = pa.schema(fields)
pqwriter = pq.ParquetWriter('sample.parquet', schema=schema, compression='gzip')
#async cycle starts here
async for message in messages:
row = {'A': message[1], 'B': message[2], 'C': message[3], 'D': message[4]}
table = pa.Table.from_pydict(row)
pqwriter.write_table(table)
#end of async cycle
pqwriter.close()
一切都很完美,但是生成的 parquet 文件大小约为 5 Mb,而如果我执行写入 csv 文件,我得到的文件大小约为 200 Kb。我检查过数据类型是否相同(csv 的列是 floatt,parquet 的列是 float)
为什么我的 parquet 比具有相同数据的 csv 大得多?
Parquet 是一种专为批量写入数据而优化的列式格式。它不是用来逐行写入数据的。
它不太适合您的用例。您可能希望以更合适的格式(例如 avro、csv)写入中间行数据,然后将数据批量转换为 parquet。
我达到了如下预期效果:
chunksize = 1e6
data = []
fields = #list of tuples
schema = pa.schema(fields)
with pq.ParquetWriter('my_parquet', schema=schema) as writer:
#async cycle starts here
rows = #dict with structure as in fields
data.extend(rows)
if len(data)>chunksize:
data = pd.DataFrame(data)
table = pa.Table.from_pandas(data, schema=schema)
writer.write_table(table)
data = []
#end of async cycle
if len(data)!=0:
data = pd.DataFrame(data)
table = pa.Table.from_pandas(data, schema=schema)
writer.write_table(table)
writer.close()
这段代码片段确实满足了我的需要。