python ProcessPoolExecutor 内存问题

python ProcessPoolExecutor memory problems

这是在 Linux、Python 3.8 中。 我使用 ProcessPoolExecutor 来加速大型数据帧列表的处理,但是因为它们都在每个进程中被复制,所以我 运行 内存不足。我该如何解决这个问题? 我的代码如下所示:

def some_func(df):
   # do some work on a single pandas DataFrame
   return single_df # returns a single pandas DataFrame 

# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
     dfs = list(executor.map(some_func, dfs) # here entire dfs gets copied 4 times?
     executor.shutdown(wait=True)

我想尽量减少不必要的数据复制,即尽量减少内存占用。 什么是好的解决方案?

您的内存问题似乎与需要足够的存储空间来容纳双倍或 200 个数据帧有关,假设 some_func 确实隐式 return None。也就是说,您的主进程存储最初的 100 个数据帧,然后将这些数据帧复制到任务队列。但是当一个数据帧从任务队列中移除时,它将暂时占用池进程地址 space 中的存储空间,直到该进程完成它并从池中取出下一个数据帧。这导致内存利用率下降,因为处理池正在耗尽任务队列中的任务,直到我们回到开始的地方(或多或少)。但是 high-water 标记将是大约 200 个数据帧,假设这些数据帧可以比你的工作函数 some_func 处理数据帧的速度快得多。但是,如果 some_func 实际上是 return 数据帧,那么内存使用量不会随着任务完成而减少。

最简单的尝试是不要一次将所有 100 个数据帧提交到您的池中,而是将您的列表分成 4 个块(您的池大小),以便您的 high-water 内存标记利用率应该在 104 个数据帧左右,再次假设 some_func 不是 returning 数据帧(而是,例如,只是写出来)。

def some_func(df):
   # do some work on a single pandas DataFrame
   pass


# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    start = 0
    cnt = len(dfs)
    while start < cnt:
        # Do chunks of 4:
        list(executor.map(some_func, dfs[start:start+4]))
        start += 4
#executor.shutdown(wait=True) is done implicitly at end of the above block

更新

既然我们现在知道 some_func 实际上是 return 一个数据帧,我假设当你全部完成时你不需要原始的数据帧列表。如果这个假设是错误的,那么我看不出你不需要存储 200 个数据帧,对吧?

所以现在我们仍然以 4 个 的块提交任务,用结果数据帧替换我们的输入数据帧:

def some_func(df):
   # do some work on a single pandas DataFrame
   return single_df # returns a single pandas DataFrame

# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    start = 0
    cnt = len(dfs)
    while start < cnt:
        # Do chunks of 4:
        dfs[start:start+4] = list(executor.map(some_func, dfs[start:start+4]))
        start += 4
    #executor.shutdown(wait=True) is done implicitly at end of the above block