如何使用 Python 和 OpenCV 进行多处理?
How to use Python and OpenCV with multiprocessing?
我正在使用 Python 3.4.3 和 OpenCV 3.0.0 处理(应用各种过滤器)内存中的非常大的图像 (80,000 x 60,000),我想使用多个 CPU 内核以提高性能。经过一些阅读,我得出了两种可能的方法:1)使用 python 的 multiprocessing
模块,让每个进程处理大图像的一部分并在处理完成后加入结果(和这个可能应该在 POSIX 系统上执行?) 2)由于 NumPy 支持 OpenMP 而 OpenCV 使用 NumPy,我可以将多处理留给 NumPy 吗?
所以我的问题是:
哪个是更好的解决方案? (如果他们看起来不合理,可能的方法是什么?)
如果选项 2 不错,我是否应该使用 OpenMP 构建 NumPy 和 OpenCV?我将如何实际进行多处理? (我真的找不到有用的说明..)
我不知道你需要什么类型的过滤器,但如果它相当简单,你可以考虑libvips。它是一个用于非常大图像(大于您拥有的内存量)的图像处理系统。它来自一系列欧盟资助的科学艺术成像项目,因此重点放在图像捕获和比较所需的操作类型上:卷积、等级、形态学、算术、颜色分析、重采样、直方图等.
是fast (faster than OpenCV, on some benchmarks at least), needs little memory, and there's a high-level Python binding。它适用于 Linux、OS X 和 Windows。它会自动为您处理所有的多重处理。
在阅读了一些 SO 帖子后,我想出了一种在 Python3 中使用 OpenCV
和 multiprocessing
的方法。我建议在 linux 上执行此操作,因为根据 this post,只要内容未更改,派生的进程就会与其父进程共享内存。这是一个最小的例子:
import cv2
import multiprocessing as mp
import numpy as np
import psutil
img = cv2.imread('test.tiff', cv2.IMREAD_ANYDEPTH) # here I'm using a indexed 16-bit tiff as an example.
num_processes = 4
kernel_size = 11
tile_size = img.shape[0]/num_processes # Assuming img.shape[0] is divisible by 4 in this case
output = mp.Queue()
def mp_filter(x, output):
print(psutil.virtual_memory()) # monitor memory usage
output.put(x, cv2.GaussianBlur(img[img.shape[0]/num_processes*x:img.shape[0]/num_processes*(x+1), :],
(kernel_size, kernel_size), kernel_size/5))
# note that you actually have to process a slightly larger block and leave out the border.
if __name__ == 'main':
processes = [mp.Process(target=mp_filter, args=(x, output)) for x in range(num_processes)]
for p in processes:
p.start()
result = []
for ii in range(num_processes):
result.append(output.get(True))
for p in processes:
p.join()
不使用Queue
,另一种从进程收集结果的方法是通过multiprocessing
模块创建一个共享数组。 (必须导入 ctypes
)
result = mp.Array(ctypes.c_uint16, img.shape[0]*img.shape[1], lock = False)
假设没有重叠,那么每个进程都可以写入数组的不同部分。然而,创建一个大的 mp.Array
出奇地慢。这实际上违背了加速运行的目的。因此,仅当与总计算时间相比增加的时间不多时才使用它。这个数组可以通过 :
变成一个 numpy 数组
result_np = np.frombuffer(result, dtypye=ctypes.c_uint16)
这可以用 Ray 干净地完成,它是一个用于并行和分布式的库 Python。 Ray 关于 "tasks" 而不是使用 fork-join 模型的原因,这提供了一些额外的灵活性(例如,即使在 fork worker 进程之后,您也将值放在共享内存中),相同的代码在多台机器上运行,您可以编写一起任务等等
import cv2
import numpy as np
import ray
num_tasks = 4
kernel_size = 11
@ray.remote
def mp_filter(image, i):
lower = image.shape[0] // num_tasks * i
upper = image.shape[0] // num_tasks * (i + 1)
return cv2.GaussianBlur(image[lower:upper, :],
(kernel_size, kernel_size), kernel_size // 5)
if __name__ == '__main__':
ray.init()
# Load the image and store it once in shared memory.
image = np.random.normal(size=(1000, 1000))
image_id = ray.put(image)
result_ids = [mp_filter.remote(image_id, i) for i in range(num_tasks)]
results = ray.get(result_ids)
请注意,您不仅可以在共享内存中存储 numpy 数组,如果您有 Python 个包含 numpy 数组的对象(如包含 numpy 数组的字典),您也可以从中受益。在引擎盖下,这使用 Plasma shared-memory object store and the Apache Arrow data layout.
您可以在 Ray documentation 中阅读更多内容。请注意,我是 Ray 开发人员之一。
我正在使用 Python 3.4.3 和 OpenCV 3.0.0 处理(应用各种过滤器)内存中的非常大的图像 (80,000 x 60,000),我想使用多个 CPU 内核以提高性能。经过一些阅读,我得出了两种可能的方法:1)使用 python 的 multiprocessing
模块,让每个进程处理大图像的一部分并在处理完成后加入结果(和这个可能应该在 POSIX 系统上执行?) 2)由于 NumPy 支持 OpenMP 而 OpenCV 使用 NumPy,我可以将多处理留给 NumPy 吗?
所以我的问题是:
哪个是更好的解决方案? (如果他们看起来不合理,可能的方法是什么?)
如果选项 2 不错,我是否应该使用 OpenMP 构建 NumPy 和 OpenCV?我将如何实际进行多处理? (我真的找不到有用的说明..)
我不知道你需要什么类型的过滤器,但如果它相当简单,你可以考虑libvips。它是一个用于非常大图像(大于您拥有的内存量)的图像处理系统。它来自一系列欧盟资助的科学艺术成像项目,因此重点放在图像捕获和比较所需的操作类型上:卷积、等级、形态学、算术、颜色分析、重采样、直方图等.
是fast (faster than OpenCV, on some benchmarks at least), needs little memory, and there's a high-level Python binding。它适用于 Linux、OS X 和 Windows。它会自动为您处理所有的多重处理。
在阅读了一些 SO 帖子后,我想出了一种在 Python3 中使用 OpenCV
和 multiprocessing
的方法。我建议在 linux 上执行此操作,因为根据 this post,只要内容未更改,派生的进程就会与其父进程共享内存。这是一个最小的例子:
import cv2
import multiprocessing as mp
import numpy as np
import psutil
img = cv2.imread('test.tiff', cv2.IMREAD_ANYDEPTH) # here I'm using a indexed 16-bit tiff as an example.
num_processes = 4
kernel_size = 11
tile_size = img.shape[0]/num_processes # Assuming img.shape[0] is divisible by 4 in this case
output = mp.Queue()
def mp_filter(x, output):
print(psutil.virtual_memory()) # monitor memory usage
output.put(x, cv2.GaussianBlur(img[img.shape[0]/num_processes*x:img.shape[0]/num_processes*(x+1), :],
(kernel_size, kernel_size), kernel_size/5))
# note that you actually have to process a slightly larger block and leave out the border.
if __name__ == 'main':
processes = [mp.Process(target=mp_filter, args=(x, output)) for x in range(num_processes)]
for p in processes:
p.start()
result = []
for ii in range(num_processes):
result.append(output.get(True))
for p in processes:
p.join()
不使用Queue
,另一种从进程收集结果的方法是通过multiprocessing
模块创建一个共享数组。 (必须导入 ctypes
)
result = mp.Array(ctypes.c_uint16, img.shape[0]*img.shape[1], lock = False)
假设没有重叠,那么每个进程都可以写入数组的不同部分。然而,创建一个大的 mp.Array
出奇地慢。这实际上违背了加速运行的目的。因此,仅当与总计算时间相比增加的时间不多时才使用它。这个数组可以通过 :
result_np = np.frombuffer(result, dtypye=ctypes.c_uint16)
这可以用 Ray 干净地完成,它是一个用于并行和分布式的库 Python。 Ray 关于 "tasks" 而不是使用 fork-join 模型的原因,这提供了一些额外的灵活性(例如,即使在 fork worker 进程之后,您也将值放在共享内存中),相同的代码在多台机器上运行,您可以编写一起任务等等
import cv2
import numpy as np
import ray
num_tasks = 4
kernel_size = 11
@ray.remote
def mp_filter(image, i):
lower = image.shape[0] // num_tasks * i
upper = image.shape[0] // num_tasks * (i + 1)
return cv2.GaussianBlur(image[lower:upper, :],
(kernel_size, kernel_size), kernel_size // 5)
if __name__ == '__main__':
ray.init()
# Load the image and store it once in shared memory.
image = np.random.normal(size=(1000, 1000))
image_id = ray.put(image)
result_ids = [mp_filter.remote(image_id, i) for i in range(num_tasks)]
results = ray.get(result_ids)
请注意,您不仅可以在共享内存中存储 numpy 数组,如果您有 Python 个包含 numpy 数组的对象(如包含 numpy 数组的字典),您也可以从中受益。在引擎盖下,这使用 Plasma shared-memory object store and the Apache Arrow data layout.
您可以在 Ray documentation 中阅读更多内容。请注意,我是 Ray 开发人员之一。