如何提高 Python 中并行循环的效率

How to improve efficiency on parallel loops in Python

我很好奇 Python 中的并行循环与 Matlab 中的 parloop 相比效率有多低。 在这里,我提出了一个简单的寻根问题,即在 ab.

之间强制进行初始 10^6 初始猜测
import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing

# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))

def forfunc(x0):
    q = [root(func, xi).x for xi in x0]
    q = np.array(q).T[0]
    return q

# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses

# the single-process loop
q = forfunc(x0)

# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()

单进程循环耗时1分26秒,并行循环耗时1分7秒。我看到一些改进,因为加速是 1.28,但效率 (timeloop/timeparallel/n_process) 在这种情况下是 0.32。

这里发生了什么以及如何提高这种效率? 我做错了什么吗?

我也尝试过以两种方式使用 dask.delayed

import dask

# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])

# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])

而且这里都比单进程循环花费更多的时间。 第一次尝试的墙时间是 3 分钟,第二次尝试花了 1 分钟 27 秒。

Dask(或 Spark)发生了什么

根据您的 single-process 测试,您的循环在 90 秒内执行了一百万个任务。因此,在平均情况下,每个任务需要您 CPU 大约 90 微秒。

在提供灵活性和弹性的分布式计算框架(如 Dask 或 Spark)中,任务的相关开销很小。 Dask 的开销低至 200 microseconds per task. The Spark 3.0 documentation 这表明 Spark 可以支持短至 200 毫秒 的任务,这也许意味着 Dask 实际上的开销比 Spark 少 1000 倍。听起来 Dask 实际上在这里做得很好!

如果您的任务比框架的 per-task 开销更快,与在相同数量的 machines/cores 上手动分配您的工作相比,您只会看到使用它的性能更差。在这种情况下,您 运行 进入了那个场景。

在您的分块数据 Dask 示例中,您只有几个任务,因此您可以通过减少开销获得更好的性能。但是,相对于原始多处理,您可能会因 Dask 的开销而对性能造成很小的影响,或者您没有使用 Dask 集群并且 运行 任务是单个进程。

Multiprocessing(和 Dask)应该有所帮助

对于这种令人尴尬的并行问题,您的多处理结果通常出乎意料。您可能需要确认您机器上的物理内核数量,尤其要确保没有其他任何东西正在积极利用您的 CPU 内核。在不知道其他任何事情的情况下,我猜这就是罪魁祸首。

在我有两个物理内核的笔记本电脑上,您的示例采用:

  • 单进程循环2min 1s
  • 两个进程1分2秒
  • 四个进程1分钟
  • 1 分钟 5 秒,用于分块的 Dask 示例,nc=2 分成两个块和一个由两个工作人员组成的 LocalCluster,每个工作人员一个线程。可能值得仔细检查您是否在集群上 运行。

使用两个进程获得大约 2 倍的加速符合我对笔记本电脑的预期,因为对于此 CPU 绑定任务,更多进程带来的好处很少或没有。 Dask 还增加了一些相对于原始多处理的开销。

%%time
​
# the single-process loop
q = forfunc(x0)
CPU times: user 1min 55s, sys: 1.68 s, total: 1min 57s
Wall time: 2min 1s
%%time
​
# parallel loop
nc = 2
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 92.6 ms, sys: 70.8 ms, total: 163 ms
Wall time: 1min 2s
%%time
​
# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()
CPU times: user 118 ms, sys: 94.6 ms, total: 212 ms
Wall time: 1min
from dask.distributed import Client, LocalCluster, wait
client = Client(n_workers=2, threads_per_worker=1)

%%time
​
nc = 2
chunks = np.split(x0,nc)
client.scatter(chunks, broadcast=True)
q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
wait(q)
/Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph: 
  (array([1.000004, 1.000012, 1.00002 , ..., 4.99998 ... 2, 5.      ]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)
CPU times: user 3.67 s, sys: 324 ms, total: 4 s
Wall time: 1min 5s