多处理写入内存不足的大文件

Multiprocessing write to large file out of memory

我有以下脚本,它适用于将较小的数据集写入文件,但在处理和写入较大的数据集时最终会耗尽内存。部分文件大小为 60gb 以上。

def do_work(index):
    ref_feature = layer.GetFeature(index)
    if ref_feature:
        try:
            return ref_feature.ExportToJson(as_object=True)
        except Exception as e:
            pass
    return None


def run_mp():
    # empty file contents
    open(f"{out_dir}/{fc_name}.geojsonseq", "w", encoding='utf8').close()

    # initiate multiprocessing
    pool = Pool(cpu_count())
    fc = layer.GetFeatureCount()
    resultset = pool.imap_unordered(do_work, range(fc), chunksize=1000)

    # this part is done after all results are ready, resulting in huge memory storage until results are written
    with open(f"{out_dir}/{fc_name}.geojsonseq", 'a') as file:
        for obj in resultset:
            file.write(f"\x1e{json.dumps(obj)}\n")


if __name__ == '__main__':
    seg_start = time.time()
    run_mp()
    print(f' completed in {time.time() - seg_start}')

问题:

有没有一种方法可以直接将结果流式传输到文件中,而无需在内存中构建结果并在最后将其转储到文件中?

由于 imap_unordered 没有对工作进程施加任何背压,我怀疑结果正在 IMapUnorderedIterator 的内部结果队列中备份。如果是这样,您有三个选择:

  • 在主进程中更快地写入结果。尝试从您的工作人员返回字符串 f"\x1e{json.dumps(obj)}\n" 而不是在主进程中转储。如果这不起作用:
  • 在工作进程中写入临时文件,并在主进程的第二遍中将它们连接起来。如果您尝试让所有工作人员同时附加最终文件,工作人员将干扰彼此的写入。您应该能够使用最少的额外磁盘 space 来完成此操作。请注意,您可以直接对文件对象执行 json.dump。或者,您可以使用 multiprocessing.Lock 保护工作人员写入同一文件。如果额外写入太耗时:
  • 自己管理背压。使用 Pool.apply_asyncProcessPoolExecutor.submit 启动 cpu_count 作业,并且仅在将结果写入磁盘后才提交额外的工作。它不像 Pool.imap_unordered 那样自动,但这是当数据开始变大时必须处理的事情!