多处理写入内存不足的大文件
Multiprocessing write to large file out of memory
我有以下脚本,它适用于将较小的数据集写入文件,但在处理和写入较大的数据集时最终会耗尽内存。部分文件大小为 60gb 以上。
def do_work(index):
ref_feature = layer.GetFeature(index)
if ref_feature:
try:
return ref_feature.ExportToJson(as_object=True)
except Exception as e:
pass
return None
def run_mp():
# empty file contents
open(f"{out_dir}/{fc_name}.geojsonseq", "w", encoding='utf8').close()
# initiate multiprocessing
pool = Pool(cpu_count())
fc = layer.GetFeatureCount()
resultset = pool.imap_unordered(do_work, range(fc), chunksize=1000)
# this part is done after all results are ready, resulting in huge memory storage until results are written
with open(f"{out_dir}/{fc_name}.geojsonseq", 'a') as file:
for obj in resultset:
file.write(f"\x1e{json.dumps(obj)}\n")
if __name__ == '__main__':
seg_start = time.time()
run_mp()
print(f' completed in {time.time() - seg_start}')
问题:
有没有一种方法可以直接将结果流式传输到文件中,而无需在内存中构建结果并在最后将其转储到文件中?
由于 imap_unordered
没有对工作进程施加任何背压,我怀疑结果正在 IMapUnorderedIterator
的内部结果队列中备份。如果是这样,您有三个选择:
- 在主进程中更快地写入结果。尝试从您的工作人员返回字符串
f"\x1e{json.dumps(obj)}\n"
而不是在主进程中转储。如果这不起作用:
- 在工作进程中写入临时文件,并在主进程的第二遍中将它们连接起来。如果您尝试让所有工作人员同时附加最终文件,工作人员将干扰彼此的写入。您应该能够使用最少的额外磁盘 space 来完成此操作。请注意,您可以直接对文件对象执行
json.dump
。或者,您可以使用 multiprocessing.Lock
保护工作人员写入同一文件。如果额外写入太耗时:
- 自己管理背压。使用
Pool.apply_async
或 ProcessPoolExecutor.submit
启动 cpu_count
作业,并且仅在将结果写入磁盘后才提交额外的工作。它不像 Pool.imap_unordered
那样自动,但这是当数据开始变大时必须处理的事情!
我有以下脚本,它适用于将较小的数据集写入文件,但在处理和写入较大的数据集时最终会耗尽内存。部分文件大小为 60gb 以上。
def do_work(index):
ref_feature = layer.GetFeature(index)
if ref_feature:
try:
return ref_feature.ExportToJson(as_object=True)
except Exception as e:
pass
return None
def run_mp():
# empty file contents
open(f"{out_dir}/{fc_name}.geojsonseq", "w", encoding='utf8').close()
# initiate multiprocessing
pool = Pool(cpu_count())
fc = layer.GetFeatureCount()
resultset = pool.imap_unordered(do_work, range(fc), chunksize=1000)
# this part is done after all results are ready, resulting in huge memory storage until results are written
with open(f"{out_dir}/{fc_name}.geojsonseq", 'a') as file:
for obj in resultset:
file.write(f"\x1e{json.dumps(obj)}\n")
if __name__ == '__main__':
seg_start = time.time()
run_mp()
print(f' completed in {time.time() - seg_start}')
问题:
有没有一种方法可以直接将结果流式传输到文件中,而无需在内存中构建结果并在最后将其转储到文件中?
由于 imap_unordered
没有对工作进程施加任何背压,我怀疑结果正在 IMapUnorderedIterator
的内部结果队列中备份。如果是这样,您有三个选择:
- 在主进程中更快地写入结果。尝试从您的工作人员返回字符串
f"\x1e{json.dumps(obj)}\n"
而不是在主进程中转储。如果这不起作用: - 在工作进程中写入临时文件,并在主进程的第二遍中将它们连接起来。如果您尝试让所有工作人员同时附加最终文件,工作人员将干扰彼此的写入。您应该能够使用最少的额外磁盘 space 来完成此操作。请注意,您可以直接对文件对象执行
json.dump
。或者,您可以使用multiprocessing.Lock
保护工作人员写入同一文件。如果额外写入太耗时: - 自己管理背压。使用
Pool.apply_async
或ProcessPoolExecutor.submit
启动cpu_count
作业,并且仅在将结果写入磁盘后才提交额外的工作。它不像Pool.imap_unordered
那样自动,但这是当数据开始变大时必须处理的事情!