Python 多处理抛出杀死:9
Python multiprocessing throws Killed: 9
我正在尝试使用 multiprocessing
来加速一个函数,我将 2000 个形状 (76, 76) 的数组平铺到 3D 数组中并应用比例因子。
当图块数量少于 200 个时它工作正常,但当它大于 200 个时我得到一个 Killed: 9
,我需要能够处理 1000 个图块的订单。
这是代码的简化版本:
from functools import partial
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import numpy as np
def func_A(data, scale, N):
"""Tile the data N times and scale it"""
arr = np.tile(data, (N, 1, 1))
arr *= scale
return arr
def func_B(N=4):
"""Create scaled arrays"""
# Make data
data = np.random.normal(size=(2000, 76, 76))
# Make scales
scales = np.arange(2000)
# Multiprocess into tiled arrays
pool = ThreadPool(cpu_count())
func = partial(func_A, N=N)
inpt = list(zip(data, scales))
results = np.asarray(pool.starmap(func, inpt), dtype=np.float64)
pool.close()
pool.join()
return results.swapaxes(0, 1)
所以 func_B(4)
很好,但 func_B(500)
就死了。
我知道我正在使用如此大的数组来占用 Python 的内存,但是让 func_B
处理大型 N
的最佳方法是什么...最好是快速?我使用 multiprocessing
错了吗?我是否应该完全使用其他东西,例如Dask、Numba、Cython 等?
如有任何帮助,我们将不胜感激。谢谢!
我认为解决内存问题最直观的解决方案是使用 float16 数组。我尝试以更简单的方式重写所有过程(func_C)
### your method ###
def func_A(data, scale, N):
"""Tile the data N times and scale it"""
arr = np.tile(data, (N, 1, 1))
arr *= scale
return arr
def func_B(N=4):
"""Create scaled arrays"""
# Make data
data = np.random.normal(size=(2000, 76, 76)).astype(np.float16) ###### set float16
# Make scales
scales = np.arange(2000).astype(np.float16) ###### set float16
# Multiprocess into tiled arrays
pool = ThreadPool(cpu_count())
func = partial(func_A, N=N)
inpt = list(zip(data, scales))
results = np.asarray(pool.starmap(func, inpt), dtype=np.float16) ###### set float16
pool.close()
pool.join()
return results.swapaxes(0, 1)
### alternative method ###
def func_C(N=4):
scales = np.arange(2000).astype(np.float16)
data = np.random.normal(size=(2000, 76, 76)).astype(np.float16)
results = np.stack(N*[data*scales[:,None,None]])
return results
查看结果
np.random.seed(33)
a = func_B(10)
np.random.seed(33)
b = func_C(10)
(a == b).all() # ===> TRUE
检查性能
下面是我在艰苦完成任务后的观察结果:
- 您应该作为输出获得的最终数组是 4 维数组,形状为
(2000, 2000, 76, 76)
,由 float64
类型值组成。粗略的计算表明这个数组的大小是:2000*2000*76*76*8 字节 = ~170 GB... 所以你绝对不能把它全部保存在内存中一次。
multiprocessing
的用法很复杂(对于没有严格研究过多处理的人来说总是如此)并且它产生的计算时间不太好。例如,在 Google Colab 上,(Tesla T4 GPU 后端,12GB RAM)N = 50
需要约 4.5 秒(最少)到 运行。可能有更好的模块实现,但我不适合它。
我的做法:
为了解决第二个问题,我使用 cupy
,它应该是 替代 用于 [=84= 中的 numpy
].通过直接替换,这意味着您可以在代码中的任何地方将 numpy
替换为 cupy
(也有例外 - 与此问题无关)。 cupy
但是在 Nvidia GPU 上使用 CUDA,因此您需要在 cupy
安装之前安装 CUDA。 (Check this guide.) 或者,如果可能的话,您可能更喜欢在线计算资源,就像我使用 Google Colab.
一样
我也把工作分成几部分。我使用函数 fnh(a, scale, N)
计算任意 N
.
的缩放平铺数组
我将预期的输出数组切成多个部分,并在这些切片上迭代 运行 fnh(...)
。可以调整切片以获得更好的优化,但我只是根据粗略的猜测使用了一些东西。
代码如下:
import cupy as cp
def fnh(a, scale, N):
arr = cp.einsum('i,ijk->ijk', scale, a)
result = cp.tile(arr, (N, 1, 1, 1))
del arr
return result
def slicer(arr, scales, N = 400):
mempool = cp.get_default_memory_pool()
pinned_mempool = cp.get_default_pinned_memory_pool()
# result = np.empty((N, 2000, 76, 76)) # to large to be allocated
section = 500 # Choices subject
parts = 80 # to optimization
step = N // parts
for i in range(parts): # Slice N into equal parts
begin = i*step
end = begin + step
stacked = cp.empty((step, 2000, 76, 76))
for j in range(2000 // section): # Section the 2000 arrays into equal parts
begin = j*section
end = begin + section
s = scales[begin:end]
a = arr[begin:end]
res = fnh(a, s, step)
stacked[:, begin:end] = res # Accumulate values
del a, res
# result[begin:end] = stacked # This is where we were supposed to
# accumulate values in result
del stacked
mempool.free_all_blocks()
pinned_mempool.free_all_blocks()
首先,我使用 cupy.einsum
计算数组上的 向量 缩放。
其次,我尽可能删除数组以恢复 space。具体来说,必须使用 mempool.free_all_blocks()
和 pinned_mempool.free_all_blocks()
释放 GPU 内存池中由 cupy
分配的 space,以恢复可用的 GPU 内存。阅读它 here。但是,由于 cupy
缓存分配的内存,因此以有限的方式使用此缓存可能(?)有助于某些加速。 (这是一个预感,我并没有特别了解它。)所以我对切片的瓷砖使用相同的内存,并在完成 N 切片后将其清除。
第三,在 # result[begin:end] = stacked
所在的位置,您应该以某种方式 卸载 数组;因为如前所述,您无法承受将整个数组存储在内存中的后果。卸载到您认为适合您的 应用程序的某个 bin 可能是避免内存问题的好方法。
第四,这段代码不完整。这是因为形成的阵列需要适当的处理,如前所述。但它完成了主要的繁重工作。
最后,在 Google Colab:
中使用 timeit
为这段代码计时
相比之下,N = 50
需要 ~50 毫秒(最小值),N = 2000
需要 ~7.4 秒(最小值)到 运行。
更新:更改为 parts = 40
和 section = 250
将最短时间缩短至 ~6.1 秒。
嗯,我相信会有更好的方法来编写这段代码,我很期待!
我不完全确定你计算的目的是什么,但下面的内容似乎可以在 dask
import dask.array as da
import numpy as np
# Make data
data = da.random.normal(size=(2000, 76, 76), chunks=(2000, 76, 76))
# Make scales
scales = np.arange(2000)
N = 500
out = da.repeat(data, N, axis=0).reshape((N, 2000, 76, 76)) * scales.reshape((1, 2000, 1, 1))
out = out.sum(axis=0).compute()
保持工作内存 <~5GB 并使用大部分内核。
我正在尝试使用 multiprocessing
来加速一个函数,我将 2000 个形状 (76, 76) 的数组平铺到 3D 数组中并应用比例因子。
当图块数量少于 200 个时它工作正常,但当它大于 200 个时我得到一个 Killed: 9
,我需要能够处理 1000 个图块的订单。
这是代码的简化版本:
from functools import partial
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import numpy as np
def func_A(data, scale, N):
"""Tile the data N times and scale it"""
arr = np.tile(data, (N, 1, 1))
arr *= scale
return arr
def func_B(N=4):
"""Create scaled arrays"""
# Make data
data = np.random.normal(size=(2000, 76, 76))
# Make scales
scales = np.arange(2000)
# Multiprocess into tiled arrays
pool = ThreadPool(cpu_count())
func = partial(func_A, N=N)
inpt = list(zip(data, scales))
results = np.asarray(pool.starmap(func, inpt), dtype=np.float64)
pool.close()
pool.join()
return results.swapaxes(0, 1)
所以 func_B(4)
很好,但 func_B(500)
就死了。
我知道我正在使用如此大的数组来占用 Python 的内存,但是让 func_B
处理大型 N
的最佳方法是什么...最好是快速?我使用 multiprocessing
错了吗?我是否应该完全使用其他东西,例如Dask、Numba、Cython 等?
如有任何帮助,我们将不胜感激。谢谢!
我认为解决内存问题最直观的解决方案是使用 float16 数组。我尝试以更简单的方式重写所有过程(func_C)
### your method ###
def func_A(data, scale, N):
"""Tile the data N times and scale it"""
arr = np.tile(data, (N, 1, 1))
arr *= scale
return arr
def func_B(N=4):
"""Create scaled arrays"""
# Make data
data = np.random.normal(size=(2000, 76, 76)).astype(np.float16) ###### set float16
# Make scales
scales = np.arange(2000).astype(np.float16) ###### set float16
# Multiprocess into tiled arrays
pool = ThreadPool(cpu_count())
func = partial(func_A, N=N)
inpt = list(zip(data, scales))
results = np.asarray(pool.starmap(func, inpt), dtype=np.float16) ###### set float16
pool.close()
pool.join()
return results.swapaxes(0, 1)
### alternative method ###
def func_C(N=4):
scales = np.arange(2000).astype(np.float16)
data = np.random.normal(size=(2000, 76, 76)).astype(np.float16)
results = np.stack(N*[data*scales[:,None,None]])
return results
查看结果
np.random.seed(33)
a = func_B(10)
np.random.seed(33)
b = func_C(10)
(a == b).all() # ===> TRUE
检查性能
下面是我在艰苦完成任务后的观察结果:
- 您应该作为输出获得的最终数组是 4 维数组,形状为
(2000, 2000, 76, 76)
,由float64
类型值组成。粗略的计算表明这个数组的大小是:2000*2000*76*76*8 字节 = ~170 GB... 所以你绝对不能把它全部保存在内存中一次。 multiprocessing
的用法很复杂(对于没有严格研究过多处理的人来说总是如此)并且它产生的计算时间不太好。例如,在 Google Colab 上,(Tesla T4 GPU 后端,12GB RAM)N = 50
需要约 4.5 秒(最少)到 运行。可能有更好的模块实现,但我不适合它。
我的做法:
为了解决第二个问题,我使用 cupy
,它应该是 替代 用于 [=84= 中的 numpy
].通过直接替换,这意味着您可以在代码中的任何地方将 numpy
替换为 cupy
(也有例外 - 与此问题无关)。 cupy
但是在 Nvidia GPU 上使用 CUDA,因此您需要在 cupy
安装之前安装 CUDA。 (Check this guide.) 或者,如果可能的话,您可能更喜欢在线计算资源,就像我使用 Google Colab.
一样
我也把工作分成几部分。我使用函数 fnh(a, scale, N)
计算任意 N
.
的缩放平铺数组
我将预期的输出数组切成多个部分,并在这些切片上迭代 运行 fnh(...)
。可以调整切片以获得更好的优化,但我只是根据粗略的猜测使用了一些东西。
代码如下:
import cupy as cp
def fnh(a, scale, N):
arr = cp.einsum('i,ijk->ijk', scale, a)
result = cp.tile(arr, (N, 1, 1, 1))
del arr
return result
def slicer(arr, scales, N = 400):
mempool = cp.get_default_memory_pool()
pinned_mempool = cp.get_default_pinned_memory_pool()
# result = np.empty((N, 2000, 76, 76)) # to large to be allocated
section = 500 # Choices subject
parts = 80 # to optimization
step = N // parts
for i in range(parts): # Slice N into equal parts
begin = i*step
end = begin + step
stacked = cp.empty((step, 2000, 76, 76))
for j in range(2000 // section): # Section the 2000 arrays into equal parts
begin = j*section
end = begin + section
s = scales[begin:end]
a = arr[begin:end]
res = fnh(a, s, step)
stacked[:, begin:end] = res # Accumulate values
del a, res
# result[begin:end] = stacked # This is where we were supposed to
# accumulate values in result
del stacked
mempool.free_all_blocks()
pinned_mempool.free_all_blocks()
首先,我使用 cupy.einsum
计算数组上的 向量 缩放。
其次,我尽可能删除数组以恢复 space。具体来说,必须使用 mempool.free_all_blocks()
和 pinned_mempool.free_all_blocks()
释放 GPU 内存池中由 cupy
分配的 space,以恢复可用的 GPU 内存。阅读它 here。但是,由于 cupy
缓存分配的内存,因此以有限的方式使用此缓存可能(?)有助于某些加速。 (这是一个预感,我并没有特别了解它。)所以我对切片的瓷砖使用相同的内存,并在完成 N 切片后将其清除。
第三,在 # result[begin:end] = stacked
所在的位置,您应该以某种方式 卸载 数组;因为如前所述,您无法承受将整个数组存储在内存中的后果。卸载到您认为适合您的 应用程序的某个 bin 可能是避免内存问题的好方法。
第四,这段代码不完整。这是因为形成的阵列需要适当的处理,如前所述。但它完成了主要的繁重工作。
最后,在 Google Colab:
中使用 timeit
为这段代码计时
相比之下,N = 50
需要 ~50 毫秒(最小值),N = 2000
需要 ~7.4 秒(最小值)到 运行。
更新:更改为 parts = 40
和 section = 250
将最短时间缩短至 ~6.1 秒。
嗯,我相信会有更好的方法来编写这段代码,我很期待!
我不完全确定你计算的目的是什么,但下面的内容似乎可以在 dask
import dask.array as da
import numpy as np
# Make data
data = da.random.normal(size=(2000, 76, 76), chunks=(2000, 76, 76))
# Make scales
scales = np.arange(2000)
N = 500
out = da.repeat(data, N, axis=0).reshape((N, 2000, 76, 76)) * scales.reshape((1, 2000, 1, 1))
out = out.sum(axis=0).compute()
保持工作内存 <~5GB 并使用大部分内核。