Python 多处理抛出杀死:9

Python multiprocessing throws Killed: 9

我正在尝试使用 multiprocessing 来加速一个函数,我将 2000 个形状 (76, 76) 的数组平铺到 3D 数组中并应用比例因子。

当图块数量少于 200 个时它工作正常,但当它大于 200 个时我得到一个 Killed: 9,我需要能够处理 1000 个图块的订单。

这是代码的简化版本:

from functools import partial
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import numpy as np

def func_A(data, scale, N):
    """Tile the data N times and scale it"""
    arr = np.tile(data, (N, 1, 1))
    arr *= scale
    return arr

def func_B(N=4):
    """Create scaled arrays"""
    # Make data
    data = np.random.normal(size=(2000, 76, 76))

    # Make scales
    scales = np.arange(2000)

    # Multiprocess into tiled arrays
    pool = ThreadPool(cpu_count())
    func = partial(func_A, N=N)
    inpt = list(zip(data, scales))
    results = np.asarray(pool.starmap(func, inpt), dtype=np.float64)
    pool.close()
    pool.join()

    return results.swapaxes(0, 1)

所以 func_B(4) 很好,但 func_B(500) 就死了。

我知道我正在使用如此大的数组来占用 Python 的内存,但是让 func_B 处理大型 N 的最佳方法是什么...最好是快速?我使用 multiprocessing 错了吗?我是否应该完全使用其他东西,例如Dask、Numba、Cython 等?

如有任何帮助,我们将不胜感激。谢谢!

我认为解决内存问题最直观的解决方案是使用 float16 数组。我尝试以更简单的方式重写所有过程(func_C)

### your method ###

def func_A(data, scale, N):
    """Tile the data N times and scale it"""
    arr = np.tile(data, (N, 1, 1))
    arr *= scale
    return arr

def func_B(N=4):
    """Create scaled arrays"""
    # Make data
    data = np.random.normal(size=(2000, 76, 76)).astype(np.float16) ###### set float16

    # Make scales
    scales = np.arange(2000).astype(np.float16) ###### set float16

    # Multiprocess into tiled arrays
    pool = ThreadPool(cpu_count())
    func = partial(func_A, N=N)
    inpt = list(zip(data, scales))
    results = np.asarray(pool.starmap(func, inpt), dtype=np.float16) ###### set float16
    pool.close()
    pool.join()

    return results.swapaxes(0, 1)

### alternative method ###

def func_C(N=4):

    scales = np.arange(2000).astype(np.float16)
    data = np.random.normal(size=(2000, 76, 76)).astype(np.float16)
    results = np.stack(N*[data*scales[:,None,None]])

    return results

查看结果

np.random.seed(33)
a = func_B(10)
np.random.seed(33)
b = func_C(10)
(a == b).all() # ===> TRUE

检查性能

下面是我在艰苦完成任务后的观察结果:

  • 您应该作为输出获得的最终数组是 4 维数组,形状为 (2000, 2000, 76, 76),由 float64 类型值组成。粗略的计算表明这个数组的大小是:2000*2000*76*76*8 字节 = ~170 GB... 所以你绝对不能把它全部保存在内存中一次。
  • multiprocessing 的用法很复杂(对于没有严格研究过多处理的人来说总是如此)并且它产生的计算时间不太好。例如,在 Google Colab 上,(Tesla T4 GPU 后端,12GB RAM)N = 50 需要约 4.5 秒(最少)到 运行。可能有更好的模块实现,但我不适合它。

我的做法:

为了解决第二个问题,我使用 cupy,它应该是 替代 用于 [=84= 中的 numpy ].通过直接替换,这意味着您可以在代码中的任何地方将 numpy 替换为 cupy(也有例外 - 与此问题无关)。 cupy 但是在 Nvidia GPU 上使用 CUDA,因此您需要在 cupy 安装之前安装 CUDA。 (Check this guide.) 或者,如果可能的话,您可能更喜欢在线计算资源,就像我使用 Google Colab.
一样 我也把工作分成几部分。我使用函数 fnh(a, scale, N) 计算任意 N.
的缩放平铺数组 我将预期的输出数组切成多个部分,并在这些切片上迭代 运行 fnh(...)。可以调整切片以获得更好的优化,但我只是根据粗略的猜测使用了一些东西。

代码如下:

import cupy as cp


def fnh(a, scale, N):
    arr = cp.einsum('i,ijk->ijk', scale, a)
    result = cp.tile(arr, (N, 1, 1, 1))

    del arr
    return result


def slicer(arr, scales, N = 400):
    mempool = cp.get_default_memory_pool()
    pinned_mempool = cp.get_default_pinned_memory_pool()
    # result = np.empty((N, 2000, 76, 76))    # to large to be allocated

    section = 500                             # Choices subject
    parts = 80                                # to optimization
    step = N // parts

    for i in range(parts):                    # Slice N into equal parts
        begin = i*step
        end = begin + step

        stacked = cp.empty((step, 2000, 76, 76))

        for j in range(2000 // section):      # Section the 2000 arrays into equal parts
            begin = j*section
            end = begin + section

            s = scales[begin:end]
            a = arr[begin:end]
            res = fnh(a, s, step)
            stacked[:, begin:end] = res       # Accumulate values

            del a, res

        # result[begin:end] = stacked         # This is where we were supposed to 
                                              # accumulate values in result
        del stacked
        mempool.free_all_blocks()
        pinned_mempool.free_all_blocks()

首先,我使用 cupy.einsum 计算数组上的 向量 缩放。

其次,我尽可能删除数组以恢复 space。具体来说,必须使用 mempool.free_all_blocks()pinned_mempool.free_all_blocks() 释放 GPU 内存池中由 cupy 分配的 space,以恢复可用的 GPU 内存。阅读它 here。但是,由于 cupy 缓存分配的内存,因此以有限的方式使用此缓存可能(?)有助于某些加速。 (这是一个预感,我并没有特别了解它。)所以我对切片的瓷砖使用相同的内存,并在完成 N 切片后将其清除。

第三,在 # result[begin:end] = stacked 所在的位置,您应该以某种方式 卸载 数组;因为如前所述,您无法承受将整个数组存储在内存中的后果。卸载到您认为适合您的 应用程序的某个 bin 可能是避免内存问题的好方法。

第四,这段代码不完整。这是因为形成的阵列需要适当的处理,如前所述。但它完成了主要的繁重工作。

最后,在 Google Colab:
中使用 timeit 为这段代码计时 相比之下,N = 50 需要 ~50 毫秒(最小值),N = 2000 需要 ~7.4 秒(最小值)到 运行。

更新:更改为 parts = 40section = 250 将最短时间缩短至 ~6.1 秒。

嗯,我相信会有更好的方法来编写这段代码,我很期待!

我不完全确定你计算的目的是什么,但下面的内容似乎可以在 dask

import dask.array as da
import numpy as np

# Make data
data = da.random.normal(size=(2000, 76, 76), chunks=(2000, 76, 76))

# Make scales
scales = np.arange(2000)
N = 500
out = da.repeat(data, N, axis=0).reshape((N, 2000, 76, 76)) * scales.reshape((1, 2000, 1, 1))
out = out.sum(axis=0).compute()

保持工作内存 <~5GB 并使用大部分内核。