从 Python 增量写入 Parquet 数据集
Incrementally writing Parquet dataset from Python
我正在从我的 Python 应用程序中写入大于 RAM 的数据 - 基本上是将数据从 SQLAlchemy 转储到 Parque。我的解决方案的灵感来自 this question. Even though increasing the batch size as hinted here 我面临的问题:
RAM 使用量大幅增长
一段时间后写入器开始变慢(写入吞吐量速度下降超过 5 倍)
我的假设是,这是因为当行数增加时,ParquetWriter
元数据管理变得昂贵。我在想我应该切换到 datasets,这样作者就可以在处理过程中关闭文件并清除元数据。
我的问题是
是否有使用 Python 和 Parquet
编写增量数据集的示例
我的假设是正确的还是不正确的,使用数据集是否有助于维持编写器的吞吐量?
我的提炼代码:
writer = pq.ParquetWriter(
fname,
Candle.to_pyarrow_schema(small_candles),
compression='snappy',
allow_truncated_timestamps=True,
version='2.0', # Highest available schema
data_page_version='2.0', # Highest available schema
) as writer:
def writeout():
nonlocal data
duration = time.time() - stats["started"]
throughout = stats["candles_processed"] / duration
logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout)
writer.write_table(
pa.Table.from_pydict(
data,
writer.schema
)
)
data = dict.fromkeys(data.keys(), [])
process = psutil.Process(os.getpid())
logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info())
# Use massive yield_per() or otherwise we are leaking memory
for item in query.yield_per(100_000):
frame = construct_frame(row_type, item)
for key, value in frame.items():
data[key].append(value)
stats["candles_processed"] += 1
# Do regular checkopoints to avoid out of memory
# and to log the progress to the console
# For fine tuning Parquet writer see
# https://issues.apache.org/jira/browse/ARROW-10052
if stats["candles_processed"] % 100_000 == 0:
writeout()
在这种情况下,原因是不正确地使用 Python 列表和字典作为工作缓冲区,正如@0x26res 所指出的。
确保正确清除列表字典后,内存消耗问题就可以忽略不计了。
我正在从我的 Python 应用程序中写入大于 RAM 的数据 - 基本上是将数据从 SQLAlchemy 转储到 Parque。我的解决方案的灵感来自 this question. Even though increasing the batch size as hinted here 我面临的问题:
RAM 使用量大幅增长
一段时间后写入器开始变慢(写入吞吐量速度下降超过 5 倍)
我的假设是,这是因为当行数增加时,ParquetWriter
元数据管理变得昂贵。我在想我应该切换到 datasets,这样作者就可以在处理过程中关闭文件并清除元数据。
我的问题是
是否有使用 Python 和 Parquet
编写增量数据集的示例我的假设是正确的还是不正确的,使用数据集是否有助于维持编写器的吞吐量?
我的提炼代码:
writer = pq.ParquetWriter(
fname,
Candle.to_pyarrow_schema(small_candles),
compression='snappy',
allow_truncated_timestamps=True,
version='2.0', # Highest available schema
data_page_version='2.0', # Highest available schema
) as writer:
def writeout():
nonlocal data
duration = time.time() - stats["started"]
throughout = stats["candles_processed"] / duration
logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout)
writer.write_table(
pa.Table.from_pydict(
data,
writer.schema
)
)
data = dict.fromkeys(data.keys(), [])
process = psutil.Process(os.getpid())
logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info())
# Use massive yield_per() or otherwise we are leaking memory
for item in query.yield_per(100_000):
frame = construct_frame(row_type, item)
for key, value in frame.items():
data[key].append(value)
stats["candles_processed"] += 1
# Do regular checkopoints to avoid out of memory
# and to log the progress to the console
# For fine tuning Parquet writer see
# https://issues.apache.org/jira/browse/ARROW-10052
if stats["candles_processed"] % 100_000 == 0:
writeout()
在这种情况下,原因是不正确地使用 Python 列表和字典作为工作缓冲区,正如@0x26res 所指出的。
确保正确清除列表字典后,内存消耗问题就可以忽略不计了。