如何提高 Python 中并行循环的效率
How to improve efficiency on parallel loops in Python
我很好奇 Python 中的并行循环与 Matlab 中的 parloop
相比效率有多低。
在这里,我提出了一个简单的寻根问题,即在 a
和 b
.
之间强制进行初始 10^6 初始猜测
import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing
# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))
def forfunc(x0):
q = [root(func, xi).x for xi in x0]
q = np.array(q).T[0]
return q
# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses
# the single-process loop
q = forfunc(x0)
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
单进程循环耗时1分26秒,并行循环耗时1分7秒。我看到一些改进,因为加速是 1.28,但效率 (timeloop/timeparallel/n_process)
在这种情况下是 0.32。
这里发生了什么以及如何提高这种效率?
我做错了什么吗?
我也尝试过以两种方式使用 dask.delayed
:
import dask
# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])
# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])
而且这里都比单进程循环花费更多的时间。
第一次尝试的墙时间是 3 分钟,第二次尝试花了 1 分钟 27 秒。
Dask(或 Spark)发生了什么
根据您的 single-process 测试,您的循环在 90 秒内执行了一百万个任务。因此,在平均情况下,每个任务需要您 CPU 大约 90 微秒。
在提供灵活性和弹性的分布式计算框架(如 Dask 或 Spark)中,任务的相关开销很小。 Dask 的开销低至 200 microseconds per task. The Spark 3.0 documentation 这表明 Spark 可以支持短至 200 毫秒 的任务,这也许意味着 Dask 实际上的开销比 Spark 少 1000 倍。听起来 Dask 实际上在这里做得很好!
如果您的任务比框架的 per-task 开销更快,与在相同数量的 machines/cores 上手动分配您的工作相比,您只会看到使用它的性能更差。在这种情况下,您 运行 进入了那个场景。
在您的分块数据 Dask 示例中,您只有几个任务,因此您可以通过减少开销获得更好的性能。但是,相对于原始多处理,您可能会因 Dask 的开销而对性能造成很小的影响,或者您没有使用 Dask 集群并且 运行 任务是单个进程。
Multiprocessing(和 Dask)应该有所帮助
对于这种令人尴尬的并行问题,您的多处理结果通常出乎意料。您可能需要确认您机器上的物理内核数量,尤其要确保没有其他任何东西正在积极利用您的 CPU 内核。在不知道其他任何事情的情况下,我猜这就是罪魁祸首。
在我有两个物理内核的笔记本电脑上,您的示例采用:
- 单进程循环2min 1s
- 两个进程1分2秒
- 四个进程1分钟
- 1 分钟 5 秒,用于分块的 Dask 示例,
nc=2
分成两个块和一个由两个工作人员组成的 LocalCluster,每个工作人员一个线程。可能值得仔细检查您是否在集群上 运行。
使用两个进程获得大约 2 倍的加速符合我对笔记本电脑的预期,因为对于此 CPU 绑定任务,更多进程带来的好处很少或没有。 Dask 还增加了一些相对于原始多处理的开销。
%%time
# the single-process loop
q = forfunc(x0)
CPU times: user 1min 55s, sys: 1.68 s, total: 1min 57s
Wall time: 2min 1s
%%time
# parallel loop
nc = 2
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 92.6 ms, sys: 70.8 ms, total: 163 ms
Wall time: 1min 2s
%%time
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 118 ms, sys: 94.6 ms, total: 212 ms
Wall time: 1min
from dask.distributed import Client, LocalCluster, wait
client = Client(n_workers=2, threads_per_worker=1)
%%time
nc = 2
chunks = np.split(x0,nc)
client.scatter(chunks, broadcast=True)
q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
wait(q)
/Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph:
(array([1.000004, 1.000012, 1.00002 , ..., 4.99998 ... 2, 5. ]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s)
CPU times: user 3.67 s, sys: 324 ms, total: 4 s
Wall time: 1min 5s
我很好奇 Python 中的并行循环与 Matlab 中的 parloop
相比效率有多低。
在这里,我提出了一个简单的寻根问题,即在 a
和 b
.
import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing
# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))
def forfunc(x0):
q = [root(func, xi).x for xi in x0]
q = np.array(q).T[0]
return q
# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses
# the single-process loop
q = forfunc(x0)
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
单进程循环耗时1分26秒,并行循环耗时1分7秒。我看到一些改进,因为加速是 1.28,但效率 (timeloop/timeparallel/n_process)
在这种情况下是 0.32。
这里发生了什么以及如何提高这种效率? 我做错了什么吗?
我也尝试过以两种方式使用 dask.delayed
:
import dask
# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])
# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])
而且这里都比单进程循环花费更多的时间。 第一次尝试的墙时间是 3 分钟,第二次尝试花了 1 分钟 27 秒。
Dask(或 Spark)发生了什么
根据您的 single-process 测试,您的循环在 90 秒内执行了一百万个任务。因此,在平均情况下,每个任务需要您 CPU 大约 90 微秒。
在提供灵活性和弹性的分布式计算框架(如 Dask 或 Spark)中,任务的相关开销很小。 Dask 的开销低至 200 microseconds per task. The Spark 3.0 documentation 这表明 Spark 可以支持短至 200 毫秒 的任务,这也许意味着 Dask 实际上的开销比 Spark 少 1000 倍。听起来 Dask 实际上在这里做得很好!
如果您的任务比框架的 per-task 开销更快,与在相同数量的 machines/cores 上手动分配您的工作相比,您只会看到使用它的性能更差。在这种情况下,您 运行 进入了那个场景。
在您的分块数据 Dask 示例中,您只有几个任务,因此您可以通过减少开销获得更好的性能。但是,相对于原始多处理,您可能会因 Dask 的开销而对性能造成很小的影响,或者您没有使用 Dask 集群并且 运行 任务是单个进程。
Multiprocessing(和 Dask)应该有所帮助
对于这种令人尴尬的并行问题,您的多处理结果通常出乎意料。您可能需要确认您机器上的物理内核数量,尤其要确保没有其他任何东西正在积极利用您的 CPU 内核。在不知道其他任何事情的情况下,我猜这就是罪魁祸首。
在我有两个物理内核的笔记本电脑上,您的示例采用:
- 单进程循环2min 1s
- 两个进程1分2秒
- 四个进程1分钟
- 1 分钟 5 秒,用于分块的 Dask 示例,
nc=2
分成两个块和一个由两个工作人员组成的 LocalCluster,每个工作人员一个线程。可能值得仔细检查您是否在集群上 运行。
使用两个进程获得大约 2 倍的加速符合我对笔记本电脑的预期,因为对于此 CPU 绑定任务,更多进程带来的好处很少或没有。 Dask 还增加了一些相对于原始多处理的开销。
%%time
# the single-process loop
q = forfunc(x0)
CPU times: user 1min 55s, sys: 1.68 s, total: 1min 57s
Wall time: 2min 1s
%%time
# parallel loop
nc = 2
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 92.6 ms, sys: 70.8 ms, total: 163 ms
Wall time: 1min 2s
%%time
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 118 ms, sys: 94.6 ms, total: 212 ms
Wall time: 1min
from dask.distributed import Client, LocalCluster, wait
client = Client(n_workers=2, threads_per_worker=1)
%%time
nc = 2
chunks = np.split(x0,nc)
client.scatter(chunks, broadcast=True)
q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
wait(q)
/Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph:
(array([1.000004, 1.000012, 1.00002 , ..., 4.99998 ... 2, 5. ]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s)
CPU times: user 3.67 s, sys: 324 ms, total: 4 s
Wall time: 1min 5s