python ProcessPoolExecutor 内存问题
python ProcessPoolExecutor memory problems
这是在 Linux、Python 3.8 中。
我使用 ProcessPoolExecutor
来加速大型数据帧列表的处理,但是因为它们都在每个进程中被复制,所以我 运行 内存不足。我该如何解决这个问题?
我的代码如下所示:
def some_func(df):
# do some work on a single pandas DataFrame
return single_df # returns a single pandas DataFrame
# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
dfs = list(executor.map(some_func, dfs) # here entire dfs gets copied 4 times?
executor.shutdown(wait=True)
我想尽量减少不必要的数据复制,即尽量减少内存占用。
什么是好的解决方案?
您的内存问题似乎与需要足够的存储空间来容纳双倍或 200 个数据帧有关,假设 some_func
确实隐式 return None
。也就是说,您的主进程存储最初的 100 个数据帧,然后将这些数据帧复制到任务队列。但是当一个数据帧从任务队列中移除时,它将暂时占用池进程地址 space 中的存储空间,直到该进程完成它并从池中取出下一个数据帧。这导致内存利用率下降,因为处理池正在耗尽任务队列中的任务,直到我们回到开始的地方(或多或少)。但是 high-water 标记将是大约 200 个数据帧,假设这些数据帧可以比你的工作函数 some_func
处理数据帧的速度快得多。但是,如果 some_func
实际上是 return 数据帧,那么内存使用量不会随着任务完成而减少。
最简单的尝试是不要一次将所有 100 个数据帧提交到您的池中,而是将您的列表分成 4 个块(您的池大小),以便您的 high-water 内存标记利用率应该在 104 个数据帧左右,再次假设 some_func
不是 returning 数据帧(而是,例如,只是写出来)。
def some_func(df):
# do some work on a single pandas DataFrame
pass
# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
start = 0
cnt = len(dfs)
while start < cnt:
# Do chunks of 4:
list(executor.map(some_func, dfs[start:start+4]))
start += 4
#executor.shutdown(wait=True) is done implicitly at end of the above block
更新
既然我们现在知道 some_func
实际上是 return 一个数据帧,我假设当你全部完成时你不需要原始的数据帧列表。如果这个假设是错误的,那么我看不出你不需要存储 200 个数据帧,对吧?
所以现在我们仍然以 4 个 和 的块提交任务,用结果数据帧替换我们的输入数据帧:
def some_func(df):
# do some work on a single pandas DataFrame
return single_df # returns a single pandas DataFrame
# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
start = 0
cnt = len(dfs)
while start < cnt:
# Do chunks of 4:
dfs[start:start+4] = list(executor.map(some_func, dfs[start:start+4]))
start += 4
#executor.shutdown(wait=True) is done implicitly at end of the above block
这是在 Linux、Python 3.8 中。
我使用 ProcessPoolExecutor
来加速大型数据帧列表的处理,但是因为它们都在每个进程中被复制,所以我 运行 内存不足。我该如何解决这个问题?
我的代码如下所示:
def some_func(df):
# do some work on a single pandas DataFrame
return single_df # returns a single pandas DataFrame
# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
dfs = list(executor.map(some_func, dfs) # here entire dfs gets copied 4 times?
executor.shutdown(wait=True)
我想尽量减少不必要的数据复制,即尽量减少内存占用。 什么是好的解决方案?
您的内存问题似乎与需要足够的存储空间来容纳双倍或 200 个数据帧有关,假设 some_func
确实隐式 return None
。也就是说,您的主进程存储最初的 100 个数据帧,然后将这些数据帧复制到任务队列。但是当一个数据帧从任务队列中移除时,它将暂时占用池进程地址 space 中的存储空间,直到该进程完成它并从池中取出下一个数据帧。这导致内存利用率下降,因为处理池正在耗尽任务队列中的任务,直到我们回到开始的地方(或多或少)。但是 high-water 标记将是大约 200 个数据帧,假设这些数据帧可以比你的工作函数 some_func
处理数据帧的速度快得多。但是,如果 some_func
实际上是 return 数据帧,那么内存使用量不会随着任务完成而减少。
最简单的尝试是不要一次将所有 100 个数据帧提交到您的池中,而是将您的列表分成 4 个块(您的池大小),以便您的 high-water 内存标记利用率应该在 104 个数据帧左右,再次假设 some_func
不是 returning 数据帧(而是,例如,只是写出来)。
def some_func(df):
# do some work on a single pandas DataFrame
pass
# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
start = 0
cnt = len(dfs)
while start < cnt:
# Do chunks of 4:
list(executor.map(some_func, dfs[start:start+4]))
start += 4
#executor.shutdown(wait=True) is done implicitly at end of the above block
更新
既然我们现在知道 some_func
实际上是 return 一个数据帧,我假设当你全部完成时你不需要原始的数据帧列表。如果这个假设是错误的,那么我看不出你不需要存储 200 个数据帧,对吧?
所以现在我们仍然以 4 个 和 的块提交任务,用结果数据帧替换我们的输入数据帧:
def some_func(df):
# do some work on a single pandas DataFrame
return single_df # returns a single pandas DataFrame
# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
start = 0
cnt = len(dfs)
while start < cnt:
# Do chunks of 4:
dfs[start:start+4] = list(executor.map(some_func, dfs[start:start+4]))
start += 4
#executor.shutdown(wait=True) is done implicitly at end of the above block