比多线程/多处理更快地循环保存图像

Saving images in a loop faster than multithreading / multiprocessing

这是一个定时示例,其中多个不同大小的图像数组被保存在一个循环中以及同时使用线程/进程:

import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter

import numpy as np
from cv2 import cv2


def save_img(idx, image, dst):
    cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)


if __name__ == '__main__':
    l1 = np.random.randint(0, 255, (100, 50, 50, 1))
    l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
    l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
    temp_dir = tempfile.mkdtemp()
    workers = 4
    t1 = perf_counter()
    for ll in l1, l2, l3:
        t = perf_counter()
        for i, img in enumerate(ll):
            save_img(i, img, temp_dir)
        print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
        for executor in ThreadPoolExecutor, ProcessPoolExecutor:
            with executor(workers) as ex:
                futures = [
                    ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
                ]
                for f in as_completed(futures):
                    f.result()
            print(
                f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
            )

我在 i5 mbp 上得到这些持续时间:

Time for 100: 0.09495482999999982 seconds
Time for 100 (ThreadPoolExecutor): 0.14151873999999998 seconds
Time for 100 (ProcessPoolExecutor): 1.5136184309999998 seconds
Time for 1000: 0.36972280300000016 seconds
Time for 1000 (ThreadPoolExecutor): 0.619205703 seconds
Time for 1000 (ProcessPoolExecutor): 2.016624468 seconds
Time for 10000: 4.232915643999999 seconds
Time for 10000 (ThreadPoolExecutor): 7.251599262 seconds
Time for 10000 (ProcessPoolExecutor): 13.963426469999998 seconds

难道线程/进程不需要更少的时间来完成同样的事情吗?为什么不在这种情况下?

代码中的计时错误,因为计时器 t 在测试池之前未重置。然而,时间的相对顺序是正确的。带有定时器重置的可能代码是:

import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter

import numpy as np
from cv2 import cv2


def save_img(idx, image, dst):
    cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)

if __name__ == '__main__':
    l1 = np.random.randint(0, 255, (100, 50, 50, 1))
    l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
    l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
    temp_dir = tempfile.mkdtemp()
    workers = 4

    for ll in l1, l2, l3:
        t = perf_counter()
        for i, img in enumerate(ll):
            save_img(i, img, temp_dir)
        print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
        for executor in ThreadPoolExecutor, ProcessPoolExecutor:
            t = perf_counter()
            with executor(workers) as ex:
                futures = [
                    ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
                ]
                for f in as_completed(futures):
                    f.result()
            print(
                f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
            )

多线程速度更快,特别是对于 I/O 绑定进程。在这种情况下,压缩图像是 cpu 密集型的,因此根据 OpenCV 和 python 包装器的实现,多线程可能会慢得多。在许多情况下,罪魁祸首是 CPython 的 GIL,但我不确定是否是这种情况(我不知道 GIL 是否在 imwrite 调用期间被释放)。在我的设置(第 8 代 i7)中,线程处理与 100 张图像的循环一样快,而对于 1000 和 10000 张图像则快一点。如果 ThreadPoolExecutor 重用线程,则将新任务分配给现有线程会产生开销。如果它不重用线程,则启动新线程会产生开销。

Multiprocessing 规避了 GIL 问题,但还有一些其他问题。首先,pickle 数据以在进程之间传递需要一些时间,对于图像,它可能 非常 昂贵。其次,在 windows 的情况下,生成新进程需要花费大量时间。查看开销(进程和线程)的一个简单测试是将 save_image 函数更改为一个什么也不做但仍需要 pickling 等的函数:

def save_img(idx, image, dst):
    if idx != idx:
        print("impossible!")

并通过类似的不带参数的方法来查看生成进程等的开销

我的设置中的时间显示仅需要 2.3 秒来生成 10000 个进程,额外需要 0.6 秒进行酸洗,这远远超过处理所需的时间。

一种提高吞吐量并将开销保持在最低限度的方法是打破块上的工作,并将每个块提交给工作人员:

import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter

import numpy as np
from cv2 import cv2


def save_img(idx, image, dst):
    cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)

def multi_save_img(idx_start, images, dst):
    for idx, image in zip(range(idx_start, idx_start + len(images)), images):
        cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)


if __name__ == '__main__':
    l1 = np.random.randint(0, 255, (100, 50, 50, 1))
    l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
    l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
    temp_dir = tempfile.mkdtemp()
    workers = 4

    for ll in l1, l2, l3:
        t = perf_counter()
        for i, img in enumerate(ll):
            save_img(i, img, temp_dir)
        print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
        chunk_size = len(ll)//workers 
        ends = [chunk_size * (_+1)  for _ in range(workers)]
        ends[-1] += len(ll) % workers
        starts = [chunk_size * _  for _ in range(workers)]
        for executor in ThreadPoolExecutor, ProcessPoolExecutor:
            t = perf_counter()
            with executor(workers) as ex:
                futures = [
                    ex.submit(multi_save_img, start, ll[start:end], temp_dir) for (start, end) in zip(starts, ends)
                ]
                for f in as_completed(futures):
                    f.result()
            print(
                f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
            )

对于多处理和多线程方法,与简单的 for 相比,这应该会给您带来显着的提升。