为什么 concurrent.futures 在返回 np.memmap 时保留内存?

Why is concurrent.futures holding onto memory when returning np.memmap?

问题

我的应用程序正在提取内存中的 zip 文件列表并将数据写入临时文件。然后我内存映射临时文件中的数据以用于另一个函数。当我在单个进程中执行此操作时,它工作正常,读取数据不会影响内存,最大 RAM 约为 40MB。但是,当我使用 concurrent.futures 执行此操作时,RAM 上升到 500MB。

我看过 示例,我知道我可以以更好的方式提交作业以在处理过程中节省内存。但我不认为我的问题是相关的,因为我在处理过程中没有 运行 内存不足。我不明白的问题是为什么即使在返回内存映射后它仍然保留在内存中。我也不了解内存中的内容,因为在单个进程中执行此操作不会将数据加载到内存中。

任何人都可以解释内存中实际存在的内容以及为什么单处理和并行处理之间存在差异吗?

PS 我用 memory_profiler 来测量内存使用情况

代码

主要代码:

def main():
    datadir = './testdata'
    files = os.listdir('./testdata')
    files = [os.path.join(datadir, f) for f in files]
    datalist = download_files(files, multiprocess=False)
    print(len(datalist))
    time.sleep(15)
    del datalist # See here that memory is freed up
    time.sleep(15)

其他功能:

def download_files(filelist, multiprocess=False):
    datalist = []
    if multiprocess:
        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
            returned_future = [executor.submit(extract_file, f) for f in filelist]
        for future in returned_future:
            datalist.append(future.result())
    else:
        for f in filelist:
            datalist.append(extract_file(f))
    return datalist

def extract_file(input_zip):
    buffer = next(iter(extract_zip(input_zip).values()))
    with tempfile.NamedTemporaryFile() as temp_logfile:
        temp_logfile.write(buffer)
        del buffer
        data = memmap(temp_logfile, dtype='float32', shape=(2000000, 4), mode='r')
    return data

def extract_zip(input_zip):
    with ZipFile(input_zip, 'r') as input_zip:
        return {name: input_zip.read(name) for name in input_zip.namelist()}

数据的帮助代码

我无法分享我的实际数据,但这里有一些简单的代码可以创建演示该问题的文件:

for i in range(1, 16):
    outdir = './testdata'
    outfile = 'file_{}.dat'.format(i)
    fp = np.memmap(os.path.join(outdir, outfile), dtype='float32', mode='w+', shape=(2000000, 4))
    fp[:] = np.random.rand(*fp.shape)
    del fp
    with ZipFile(outdir + '/' + outfile[:-4] + '.zip', mode='w', compression=ZIP_DEFLATED) as z:
        z.write(outdir + '/' + outfile, outfile)

问题是您试图在进程之间传递一个 np.memmap,但这不起作用。

最简单的解决方案是传递文件名,并让子进程 memmap 使用相同的文件。


当您 pass an argument to a child process or pool method via multiprocessing, or return a value from one (including doing so indirectly via a ProcessPoolExecutor), it works by calling pickle.dumps 值时,跨进程传递泡菜(细节各不相同,但无论是 Pipe 还是 Queue 或其他什么都不重要), 然后 unpickling 另一边的结果。

A memmap 基本上只是一个 mmap 对象,在 mmapped 内存中分配了一个 ndarray

并且 Python 不知道如何 pickle mmap 对象。 (如果您尝试,您将得到 PicklingErrorBrokenProcessPool 错误,具体取决于您的 Python 版本。)

一个np.memmap可以被pickle,因为它只是np.ndarray的一个子类——但是pickling和unpickling它实际上复制了数据并给你一个普通内存数组。 (如果你看 data._mmap,它是 None。)如果它给你一个错误而不是默默地复制你的所有数据可能会更好(pickle-replacement 库 dill正是这样:TypeError: can't pickle mmap.mmap objects),但事实并非如此。


在进程之间传递底层文件描述符并非不可能——每个平台的细节都不同,但所有主要平台都有办法做到这一点。然后您可以使用传递的 fd 在接收端构建一个 mmap,然后从中构建一个 memmap。您甚至可以将其包装在 np.memmap 的子类中。但我怀疑如果这不是有点困难,那么有人已经做到了,事实上它可能已经成为 dill 的一部分,如果不是 numpy 本身。

另一种选择是显式使用 shared memory features of multiprocessing,并在共享内存中分配数组而不是 mmap

但最简单的解决方案是,正如我在顶部所说的那样,只传递文件名而不是对象,并让每一方 memmap 使用相同的文件。不幸的是,这确实意味着你不能只使用关闭时删除 NamedTemporaryFile(尽管你使用它的方式已经不可移植并且不会在 Windows 上工作它在 Unix 上的工作方式),但改变它仍然可能比其他替代方案少工作。