如何使用 Python 和 OpenCV 进行多处理?

How to use Python and OpenCV with multiprocessing?

我正在使用 Python 3.4.3 和 OpenCV 3.0.0 处理(应用各种过滤器)内存中的非常大的图像 (80,000 x 60,000),我想使用多个 CPU 内核以提高性能。经过一些阅读,我得出了两种可能的方法:1)使用 python 的 multiprocessing 模块,让每个进程处理大图像的一部分并在处理完成后加入结果(和这个可能应该在 POSIX 系统上执行?) 2)由于 NumPy 支持 OpenMP 而 OpenCV 使用 NumPy,我可以将多处理留给 NumPy 吗?

所以我的问题是:

哪个是更好的解决方案? (如果他们看起来不合理,可能的方法是什么?)

如果选项 2 不错,我是否应该使用 OpenMP 构建 NumPy 和 OpenCV?我将如何实际进行多处理? (我真的找不到有用的说明..)

我不知道你需要什么类型的过滤器,但如果它相当简单,你可以考虑libvips。它是一个用于非常大图像(大于您拥有的内存量)的图像处理系统。它来自一系列欧盟资助的科学艺术成像项目,因此重点放在图像捕获和比较所需的操作类型上:卷积、等级、形态学、算术、颜色分析、重采样、直方图等.

fast (faster than OpenCV, on some benchmarks at least), needs little memory, and there's a high-level Python binding。它适用于 Linux、OS X 和 Windows。它会自动为您处理所有的多重处理。

在阅读了一些 SO 帖子后,我想出了一种在 Python3 中使用 OpenCVmultiprocessing 的方法。我建议在 linux 上执行此操作,因为根据 this post,只要内容未更改,派生的进程就会与其父进程共享内存。这是一个最小的例子:

import cv2
import multiprocessing as mp
import numpy as np
import psutil

img = cv2.imread('test.tiff', cv2.IMREAD_ANYDEPTH) # here I'm using a indexed 16-bit tiff as an example.
num_processes = 4
kernel_size = 11
tile_size = img.shape[0]/num_processes  # Assuming img.shape[0] is divisible by 4 in this case

output = mp.Queue()

def mp_filter(x, output):
    print(psutil.virtual_memory())  # monitor memory usage
    output.put(x, cv2.GaussianBlur(img[img.shape[0]/num_processes*x:img.shape[0]/num_processes*(x+1), :], 
               (kernel_size, kernel_size), kernel_size/5))
    # note that you actually have to process a slightly larger block and leave out the border.

if __name__ == 'main':
    processes = [mp.Process(target=mp_filter, args=(x, output)) for x in range(num_processes)]

    for p in processes:
        p.start()

    result = []
    for ii in range(num_processes):
        result.append(output.get(True))

    for p in processes:
        p.join()

不使用Queue,另一种从进程收集结果的方法是通过multiprocessing模块创建一个共享数组。 (必须导入 ctypes

result = mp.Array(ctypes.c_uint16, img.shape[0]*img.shape[1], lock = False)

假设没有重叠,那么每个进程都可以写入数组的不同部分。然而,创建一个大的 mp.Array 出奇地慢。这实际上违背了加速运行的目的。因此,仅当与总计算时间相比增加的时间不多时才使用它。这个数组可以通过 :

变成一个 numpy 数组
result_np = np.frombuffer(result, dtypye=ctypes.c_uint16)

这可以用 Ray 干净地完成,它是一个用于并行和分布式的库 Python。 Ray 关于 "tasks" 而不是使用 fork-join 模型的原因,这提供了一些额外的灵活性(例如,即使在 fork worker 进程之后,您也将值放在共享内存中),相同的代码在多台机器上运行,您可以编写一起任务等等

import cv2
import numpy as np
import ray

num_tasks = 4
kernel_size = 11


@ray.remote
def mp_filter(image, i):
    lower = image.shape[0] // num_tasks * i
    upper = image.shape[0] // num_tasks * (i + 1)
    return cv2.GaussianBlur(image[lower:upper, :],
                            (kernel_size, kernel_size), kernel_size // 5)


if __name__ == '__main__':
    ray.init()

    # Load the image and store it once in shared memory.
    image = np.random.normal(size=(1000, 1000))
    image_id = ray.put(image)

    result_ids = [mp_filter.remote(image_id, i) for i in range(num_tasks)]
    results = ray.get(result_ids)

请注意,您不仅可以在共享内存中存储 numpy 数组,如果您有 Python 个包含 numpy 数组的对象(如包含 numpy 数组的字典),您也可以从中受益。在引擎盖下,这使用 Plasma shared-memory object store and the Apache Arrow data layout.

您可以在 Ray documentation 中阅读更多内容。请注意,我是 Ray 开发人员之一。