如何使用 joblib 并行化 scipy fftconvolve?
How to parallelize scipy fftconvolve using joblib?
所以我正在使用 scipy's fftconvolve, and I wanted to parallelize the different filterings I am doing for a single image. For the parallelization I wanted to use joblib 过滤大图像。
但是,我被 2 个结果困扰:
- 使用多处理后端,任务要慢得多(慢 1.5 倍)
- 使用多线程后端,任务速度更快(快 25%)
我对这两个结果感到惊讶,因为我确信卷积是 CPU-bound。
这里是我在 jupyter notebook 中用来计算运行时间的代码:
from joblib import Parallel, delayed
import numpy as np
from scipy.signal import fftconvolve
im_size = (512, 512)
filter_size = tuple(s-1 for s in im_size)
n_filters = 3
image = np.random.rand(*im_size)
filters = [np.random.rand(*filter_size) for i in range(n_filters)]
%%timeit
s = np.sum(
Parallel(n_jobs=n_filters, backend='multiprocessing')(
delayed(fftconvolve)(image, f) for f in filters
)
)
283 ms ± 12.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
s = np.sum(
Parallel(n_jobs=n_filters, backend='threading')(
delayed(fftconvolve)(image, f) for f in filters
)
)
142 ms ± 15.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
s = np.sum([fftconvolve(image, f) for f in filters])
198 ms ± 2.69 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我也尝试了不同的方法,比如将图像放在 memmap 中,或者减少预分派的作业,但没有从根本上改变结果。
为什么多线程不能加速计算?
基准并行处理的问题在于,您必须正确考虑代码中造成的开销才能得出正确的结论。使用并行处理时有 3 个开销来源:
产生线程或进程:这是你每次调用 Parallel
时都会做的事情,除非你依赖一个托管的 Parallel
对象(使用 with
上下文)或当您使用 loky
后端时。有关详细信息,请参阅 here。
Importing modules in fresh interpreters: 对于依赖fresh processes的后端(当start方法不是fork
时),有一个需要 re-import 所有模块。这可能会导致开销。
进程之间的通信:当使用进程时(所以不使用backend=threading
),你需要将数组传送给每个worker。通信会减慢计算速度,特别是对于具有大输入的短任务,例如 fftconvolve
.
如果您的目标是多次调用此函数,则应修改基准以实际消除为 Parallel
对象生成工作人员的成本,方法是使用托管 Parallel
对象或依赖 backend=loky
的此功能。并避免由于加载模块而产生的开销:
from joblib import Parallel, delayed
import numpy as np
from scipy.signal import fftconvolve
from time import time, sleep
def start_processes(im, filter, mode=None, delay=0):
sleep(delay)
return im if im is not None else 0
def time_parallel(name, parallel, image, filters, n_rep=50):
print(80*"=" + "\n" + name + "\n" + 80*"=")
# Time to start the pool of workers and initialize the processes
# With this first call, the processes/threads are actually started
# and further calls will not incure this overhead anymore
t0 = time()
np.sum(parallel(
delayed(start_processes)(image, f, mode='valid') for f in filters)
)
print(f"Pool init overhead: {(time() - t0) / 1e-3:.3f}ms")
# Time the overhead due to loading of the scipy module
# With this call, the scipy.signal module is loaded in the child
# processes. This import can take up to 200ms for fresh interpreter.
# This overhead is only present for the `loky` backend. For the
# `multiprocessing` backend, as the processes are started with `fork`,
# they already have a loaded scipy module. For the `threading` backend
# and the iterative run, there no need to re-import the module so this
# overhead is non-existent
t0 = time()
np.sum(parallel(
delayed(fftconvolve)(image, f, mode='valid') for f in filters)
)
print(f"Library load overhead: {(time() - t0) / 1e-3:.3f}ms")
# Average the runtime on multiple run, once the external overhead have
# been taken into account.
times = []
for _ in range(n_rep):
t0 = time()
np.sum(parallel(
delayed(fftconvolve)(image, f, mode='valid') for f in filters
))
times.append(time() - t0)
print(f"Runtime without init overhead: {np.mean(times) / 1e-3:.3f}ms,"
f" (+-{np.std(times) / 1e-3:.3f}ms)\n")
# Setup the problem size
im_size = (512, 512)
filter_size = tuple(5 for s in im_size)
n_filters = 3
n_jobs = 3
n_rep = 50
# Generate random data
image = np.random.rand(*im_size)
filters = np.random.rand(n_filters, *filter_size)
# Time the `backend='multiprocessing'`
with Parallel(n_jobs=n_jobs, backend='multiprocessing') as parallel:
time_parallel("Multiprocessing", parallel, image, filters, n_rep=n_rep)
sleep(.5)
# Time the `backend='threading'`
with Parallel(n_jobs=n_jobs, backend='threading') as parallel:
time_parallel("Threading", parallel, image, filters, n_rep=n_rep)
sleep(.5)
# Time the `backend='loky'`.
# For this backend, there is no need to rely on a managed `Parallel` object
# as loky reuses the previously created pool by default. We will thus mimique
# the creation of a new `Parallel` object for each repetition
def parallel_loky(it):
Parallel(n_jobs=n_jobs)(it)
time_parallel("Loky", parallel_loky, image, filters, n_rep=n_rep)
sleep(.5)
# Time the iterative run.
# We rely on the SequentialBackend of joblib which is used whenever `n_jobs=1`
# to allow using the same function. This should not change the computation
# much.
def parallel_iterative(it):
Parallel(n_jobs=1)(it)
time_parallel("Iterative", parallel_iterative, image, filters, n_rep=n_rep)
$ python main.py
================================================================================
Multiprocessing
================================================================================
Pool init overhead: 12.112ms
Library load overhead: 96.520ms
Runtime without init overhead: 77.548ms (+-16.119ms)
================================================================================
Threading
================================================================================
Pool init overhead: 11.887ms
Library load overhead: 76.858ms
Runtime without init overhead: 31.931ms (+-3.569ms)
================================================================================
Loky
================================================================================
Pool init overhead: 502.369ms
Library load overhead: 245.368ms
Runtime without init overhead: 44.808ms (+-4.074ms)
================================================================================
Iterative
================================================================================
Pool init overhead: 1.048ms
Library load overhead: 92.595ms
Runtime without init overhead: 47.749ms (+-4.081ms)
通过此基准测试,您可以看到启动后使用 loky
后端实际上更快。但是如果不多次使用的话,开销就太大了。
所以我正在使用 scipy's fftconvolve, and I wanted to parallelize the different filterings I am doing for a single image. For the parallelization I wanted to use joblib 过滤大图像。 但是,我被 2 个结果困扰:
- 使用多处理后端,任务要慢得多(慢 1.5 倍)
- 使用多线程后端,任务速度更快(快 25%)
我对这两个结果感到惊讶,因为我确信卷积是 CPU-bound。
这里是我在 jupyter notebook 中用来计算运行时间的代码:
from joblib import Parallel, delayed
import numpy as np
from scipy.signal import fftconvolve
im_size = (512, 512)
filter_size = tuple(s-1 for s in im_size)
n_filters = 3
image = np.random.rand(*im_size)
filters = [np.random.rand(*filter_size) for i in range(n_filters)]
%%timeit
s = np.sum(
Parallel(n_jobs=n_filters, backend='multiprocessing')(
delayed(fftconvolve)(image, f) for f in filters
)
)
283 ms ± 12.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
s = np.sum(
Parallel(n_jobs=n_filters, backend='threading')(
delayed(fftconvolve)(image, f) for f in filters
)
)
142 ms ± 15.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
s = np.sum([fftconvolve(image, f) for f in filters])
198 ms ± 2.69 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
我也尝试了不同的方法,比如将图像放在 memmap 中,或者减少预分派的作业,但没有从根本上改变结果。
为什么多线程不能加速计算?
基准并行处理的问题在于,您必须正确考虑代码中造成的开销才能得出正确的结论。使用并行处理时有 3 个开销来源:
产生线程或进程:这是你每次调用
Parallel
时都会做的事情,除非你依赖一个托管的Parallel
对象(使用with
上下文)或当您使用loky
后端时。有关详细信息,请参阅 here。Importing modules in fresh interpreters: 对于依赖fresh processes的后端(当start方法不是
fork
时),有一个需要 re-import 所有模块。这可能会导致开销。进程之间的通信:当使用进程时(所以不使用
backend=threading
),你需要将数组传送给每个worker。通信会减慢计算速度,特别是对于具有大输入的短任务,例如fftconvolve
.
如果您的目标是多次调用此函数,则应修改基准以实际消除为 Parallel
对象生成工作人员的成本,方法是使用托管 Parallel
对象或依赖 backend=loky
的此功能。并避免由于加载模块而产生的开销:
from joblib import Parallel, delayed
import numpy as np
from scipy.signal import fftconvolve
from time import time, sleep
def start_processes(im, filter, mode=None, delay=0):
sleep(delay)
return im if im is not None else 0
def time_parallel(name, parallel, image, filters, n_rep=50):
print(80*"=" + "\n" + name + "\n" + 80*"=")
# Time to start the pool of workers and initialize the processes
# With this first call, the processes/threads are actually started
# and further calls will not incure this overhead anymore
t0 = time()
np.sum(parallel(
delayed(start_processes)(image, f, mode='valid') for f in filters)
)
print(f"Pool init overhead: {(time() - t0) / 1e-3:.3f}ms")
# Time the overhead due to loading of the scipy module
# With this call, the scipy.signal module is loaded in the child
# processes. This import can take up to 200ms for fresh interpreter.
# This overhead is only present for the `loky` backend. For the
# `multiprocessing` backend, as the processes are started with `fork`,
# they already have a loaded scipy module. For the `threading` backend
# and the iterative run, there no need to re-import the module so this
# overhead is non-existent
t0 = time()
np.sum(parallel(
delayed(fftconvolve)(image, f, mode='valid') for f in filters)
)
print(f"Library load overhead: {(time() - t0) / 1e-3:.3f}ms")
# Average the runtime on multiple run, once the external overhead have
# been taken into account.
times = []
for _ in range(n_rep):
t0 = time()
np.sum(parallel(
delayed(fftconvolve)(image, f, mode='valid') for f in filters
))
times.append(time() - t0)
print(f"Runtime without init overhead: {np.mean(times) / 1e-3:.3f}ms,"
f" (+-{np.std(times) / 1e-3:.3f}ms)\n")
# Setup the problem size
im_size = (512, 512)
filter_size = tuple(5 for s in im_size)
n_filters = 3
n_jobs = 3
n_rep = 50
# Generate random data
image = np.random.rand(*im_size)
filters = np.random.rand(n_filters, *filter_size)
# Time the `backend='multiprocessing'`
with Parallel(n_jobs=n_jobs, backend='multiprocessing') as parallel:
time_parallel("Multiprocessing", parallel, image, filters, n_rep=n_rep)
sleep(.5)
# Time the `backend='threading'`
with Parallel(n_jobs=n_jobs, backend='threading') as parallel:
time_parallel("Threading", parallel, image, filters, n_rep=n_rep)
sleep(.5)
# Time the `backend='loky'`.
# For this backend, there is no need to rely on a managed `Parallel` object
# as loky reuses the previously created pool by default. We will thus mimique
# the creation of a new `Parallel` object for each repetition
def parallel_loky(it):
Parallel(n_jobs=n_jobs)(it)
time_parallel("Loky", parallel_loky, image, filters, n_rep=n_rep)
sleep(.5)
# Time the iterative run.
# We rely on the SequentialBackend of joblib which is used whenever `n_jobs=1`
# to allow using the same function. This should not change the computation
# much.
def parallel_iterative(it):
Parallel(n_jobs=1)(it)
time_parallel("Iterative", parallel_iterative, image, filters, n_rep=n_rep)
$ python main.py
================================================================================
Multiprocessing
================================================================================
Pool init overhead: 12.112ms
Library load overhead: 96.520ms
Runtime without init overhead: 77.548ms (+-16.119ms)
================================================================================
Threading
================================================================================
Pool init overhead: 11.887ms
Library load overhead: 76.858ms
Runtime without init overhead: 31.931ms (+-3.569ms)
================================================================================
Loky
================================================================================
Pool init overhead: 502.369ms
Library load overhead: 245.368ms
Runtime without init overhead: 44.808ms (+-4.074ms)
================================================================================
Iterative
================================================================================
Pool init overhead: 1.048ms
Library load overhead: 92.595ms
Runtime without init overhead: 47.749ms (+-4.081ms)
通过此基准测试,您可以看到启动后使用 loky
后端实际上更快。但是如果不多次使用的话,开销就太大了。