使用 numpy/scipy 最大限度地减少 Python multiprocessing.Pool 中的开销
Minimize overhead in Python multiprocessing.Pool with numpy/scipy
我花了几个小时尝试并行化我的 number-c运行ching 代码,但这样做只会让速度变慢。不幸的是,当我尝试将其简化为下面的示例并且我真的不想 post 整个程序时,问题就消失了。那么问题来了:在这种类型的程序中我应该避免哪些陷阱?
(注:Unutbu回答后的后续在最下方)
情况如下:
- 它是关于一个模块,它定义了一个包含大量内部数据的 class
BigData
。在示例中,有一个插值函数列表 ff
;在实际程序中,还有更多,例如ffA[k]
、ffB[k]
、ffC[k]
.
- 计算将class化为"embarrassingly parallel":一次可以处理较小的数据块。在示例中,即
do_chunk()
.
- 在我的实际程序中,示例中显示的方法会导致最差的性能:每个块大约 1 秒(在单线程中完成时实际计算时间为 0.1 秒左右)。因此,对于 n=50,
do_single()
会在 5 秒内 运行 而 do_multi()
会在 55 秒内 运行。
- 我还尝试通过将
xi
和 yi
数组分割成连续的块并遍历每个块中的所有 k
值来拆分工作。这样效果好一些。现在,无论我使用 1、2、3 还是 4 个线程,总执行时间都没有差异。但是,当然,我想看到实际的加速!
- 这可能是相关的:Multiprocessing.Pool makes Numpy matrix multiplication slower。然而,在程序的其他地方,我使用了一个多处理池来进行更加孤立的计算:一个看起来像
def do_chunk(array1, array2, array3)
的函数(未绑定到 class)并且只对它进行 numpy 计算大批。在那里,速度有了显着提升。
- CPU 使用率与预期的并行进程数成比例(三个线程的 300% CPU 使用率)。
#!/usr/bin/python2.7
import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum
def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
输出:
Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
时间安排在 Intel Core i3-3227 CPU 上,具有 2 个内核、4 个线程、运行ning 64 位 Linux。实际程序中,多进程版本(池机制,即使只用一个核)比单进程版本慢了10倍。
跟进
Unutbu 的回答让我走上了正轨。在实际程序中,self
被 pickle 成一个 37 到 140 MB 的对象,需要传递给工作进程。更糟糕的是,Python 酸洗非常慢;酸洗本身需要几秒钟,每块工作都会传递给工作进程。 Linux 中的 apply_async
除了 pickle 和传递大数据对象外,开销很小;对于一个小函数(添加一些整数参数),每个 apply_async
/get
对只需要 0.2 毫秒。因此,将工作分成非常小的块本身并不是问题。因此,我将所有大数组参数作为索引传递给全局变量。为了 CPU 缓存优化,我保持小块大小。
全局变量存储在全局dict
;设置工作池后,父进程中的条目将立即删除。只有 dict
的键被传输到工作进程。 pickling/IPC 唯一的大数据是工人创建的新数据。
#!/usr/bin/python2.7
import numpy as np, sys
from multiprocessing import Pool
_mproc_data = {} # global storage for objects during multiprocessing.
class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)
def do_chunk(self, k, xi, yi):
# do the work and return an array of the same shape as xi, yi
zi = k*np.ones_like(xi)
return zi
def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]
# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder
procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")
# allocate space for combined results
zi = np.zeros_like(xi)
# get data from workers and finish
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)
这里是速度测试的结果(同样,2 个内核,4 个线程),改变工作进程的数量和块中的内存量(xi
、yi
, zi
数组切片)。数字在 "million result values per second" 中,但这对于比较来说并不重要。 “1 个流程”行是使用完整输入数据直接调用 do_chunk
,没有任何子流程。
#Proc 125K 250K 500K 1000K unlimited
1 0.82
2 4.28 1.96 1.3 1.31
3 2.69 1.06 1.06 1.07
4 2.17 1.27 1.23 1.28
内存中数据大小的影响相当大。 CPU 有 3 MB 共享 L3 缓存,外加每个内核 256 KB L2 缓存。请注意,该计算还需要访问 BigData
对象的数 MB 内部数据。因此,我们从中学到的是,进行这种速度测试是有用的。对于这个程序,2个进程最快,4个次之,3个最慢。
尽量减少进程间通信。
在 multiprocessing
模块中,所有(单台计算机)进程间通信都通过队列完成。对象通过队列
被腌制。因此,尝试通过队列发送更少 and/or 个较小的对象。
不通过队列发送 self
,BigData
的实例。它相当大,并且随着 self
中数据量的增加而变大:
In [6]: import pickle
In [14]: len(pickle.dumps(BigData(50)))
Out[14]: 1052187
每个
调用时间 pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
,
self
在主进程中 pickled,在 worker 进程中 unpickled。这
len(pickle.dumps(BigData(N)))
的大小增加了 N
。
让数据从全局变量中读取。在 Linux,您可以利用写时复制。作为 Jan-Philip Gehrcke explains:
After fork(), parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That's [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state, it actually accesses the parent's memory. Only upon modification, the corresponding bits and pieces are copied into the memory space of the child.
因此,您可以避免通过队列传递 BigData
的实例
通过简单地将实例定义为全局实例,bd = BigData(n)
,(正如您已经在做的那样)并在工作进程中引用它的值(例如 _do_chunk_wrapper
)。它基本上相当于从对 pool.apply_async
:
的调用中删除 self
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
并以全局方式访问 bd
,并对 do_chunk_wrapper
的调用签名进行必要的伴随更改。
尝试将更长的 运行 函数 func
传递给 pool.apply_async
。
如果您有许多对 pool.apply_async
的快速完成调用,那么通过队列传递参数和 return 值的开销将成为总时间的重要部分。相反,如果您减少对 pool.apply_async
的调用并在 return 获得结果之前给每个 func
更多的工作要做,那么进程间通信将成为总时间的一小部分。
下面,我修改了 _do_chunk_wrapper
以接受 k_start
和 k_end
参数,这样每次调用 pool.apply_async
都会计算 [=39 的许多值的总和=] 在 return 结果之前。
import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = interpolate.RectBivariateSpline(
np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
n = self.n
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
for k in range(k_start, k_end))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = mp.Pool(numproc)
stopwatch('\nPool setup')
ks = list(map(int, np.linspace(0, self.n, numproc+1)))
for i in range(len(ks)-1):
k_start, k_end = ks[i:i+2]
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
total = 0.0
for k, p in enumerate(procs):
total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
print(total)
stopwatch('Jobs done')
pool.close()
pool.join()
return total
def do_single(self, xi, yi):
total = 0.0
for k in range(self.n):
total += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return total
def _do_chunk_wrapper(k_start, k_end, xi, yi):
return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
产量
Initialized: 0.15 seconds
Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds
与原代码相比:
Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds
我花了几个小时尝试并行化我的 number-c运行ching 代码,但这样做只会让速度变慢。不幸的是,当我尝试将其简化为下面的示例并且我真的不想 post 整个程序时,问题就消失了。那么问题来了:在这种类型的程序中我应该避免哪些陷阱?
(注:Unutbu回答后的后续在最下方)
情况如下:
- 它是关于一个模块,它定义了一个包含大量内部数据的 class
BigData
。在示例中,有一个插值函数列表ff
;在实际程序中,还有更多,例如ffA[k]
、ffB[k]
、ffC[k]
. - 计算将class化为"embarrassingly parallel":一次可以处理较小的数据块。在示例中,即
do_chunk()
. - 在我的实际程序中,示例中显示的方法会导致最差的性能:每个块大约 1 秒(在单线程中完成时实际计算时间为 0.1 秒左右)。因此,对于 n=50,
do_single()
会在 5 秒内 运行 而do_multi()
会在 55 秒内 运行。 - 我还尝试通过将
xi
和yi
数组分割成连续的块并遍历每个块中的所有k
值来拆分工作。这样效果好一些。现在,无论我使用 1、2、3 还是 4 个线程,总执行时间都没有差异。但是,当然,我想看到实际的加速! - 这可能是相关的:Multiprocessing.Pool makes Numpy matrix multiplication slower。然而,在程序的其他地方,我使用了一个多处理池来进行更加孤立的计算:一个看起来像
def do_chunk(array1, array2, array3)
的函数(未绑定到 class)并且只对它进行 numpy 计算大批。在那里,速度有了显着提升。 - CPU 使用率与预期的并行进程数成比例(三个线程的 300% CPU 使用率)。
#!/usr/bin/python2.7
import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum
def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
输出:
Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds
时间安排在 Intel Core i3-3227 CPU 上,具有 2 个内核、4 个线程、运行ning 64 位 Linux。实际程序中,多进程版本(池机制,即使只用一个核)比单进程版本慢了10倍。
跟进
Unutbu 的回答让我走上了正轨。在实际程序中,self
被 pickle 成一个 37 到 140 MB 的对象,需要传递给工作进程。更糟糕的是,Python 酸洗非常慢;酸洗本身需要几秒钟,每块工作都会传递给工作进程。 Linux 中的 apply_async
除了 pickle 和传递大数据对象外,开销很小;对于一个小函数(添加一些整数参数),每个 apply_async
/get
对只需要 0.2 毫秒。因此,将工作分成非常小的块本身并不是问题。因此,我将所有大数组参数作为索引传递给全局变量。为了 CPU 缓存优化,我保持小块大小。
全局变量存储在全局dict
;设置工作池后,父进程中的条目将立即删除。只有 dict
的键被传输到工作进程。 pickling/IPC 唯一的大数据是工人创建的新数据。
#!/usr/bin/python2.7
import numpy as np, sys
from multiprocessing import Pool
_mproc_data = {} # global storage for objects during multiprocessing.
class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)
def do_chunk(self, k, xi, yi):
# do the work and return an array of the same shape as xi, yi
zi = k*np.ones_like(xi)
return zi
def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]
# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder
procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")
# allocate space for combined results
zi = np.zeros_like(xi)
# get data from workers and finish
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)
if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)
这里是速度测试的结果(同样,2 个内核,4 个线程),改变工作进程的数量和块中的内存量(xi
、yi
, zi
数组切片)。数字在 "million result values per second" 中,但这对于比较来说并不重要。 “1 个流程”行是使用完整输入数据直接调用 do_chunk
,没有任何子流程。
#Proc 125K 250K 500K 1000K unlimited
1 0.82
2 4.28 1.96 1.3 1.31
3 2.69 1.06 1.06 1.07
4 2.17 1.27 1.23 1.28
内存中数据大小的影响相当大。 CPU 有 3 MB 共享 L3 缓存,外加每个内核 256 KB L2 缓存。请注意,该计算还需要访问 BigData
对象的数 MB 内部数据。因此,我们从中学到的是,进行这种速度测试是有用的。对于这个程序,2个进程最快,4个次之,3个最慢。
尽量减少进程间通信。
在 multiprocessing
模块中,所有(单台计算机)进程间通信都通过队列完成。对象通过队列
被腌制。因此,尝试通过队列发送更少 and/or 个较小的对象。
不通过队列发送
self
,BigData
的实例。它相当大,并且随着self
中数据量的增加而变大:In [6]: import pickle In [14]: len(pickle.dumps(BigData(50))) Out[14]: 1052187
每个 调用时间
pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
,self
在主进程中 pickled,在 worker 进程中 unpickled。这len(pickle.dumps(BigData(N)))
的大小增加了N
。让数据从全局变量中读取。在 Linux,您可以利用写时复制。作为 Jan-Philip Gehrcke explains:
After fork(), parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That's [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state, it actually accesses the parent's memory. Only upon modification, the corresponding bits and pieces are copied into the memory space of the child.
因此,您可以避免通过队列传递
的调用中删除BigData
的实例 通过简单地将实例定义为全局实例,bd = BigData(n)
,(正如您已经在做的那样)并在工作进程中引用它的值(例如_do_chunk_wrapper
)。它基本上相当于从对pool.apply_async
:self
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
并以全局方式访问
bd
,并对do_chunk_wrapper
的调用签名进行必要的伴随更改。尝试将更长的 运行 函数
func
传递给pool.apply_async
。 如果您有许多对pool.apply_async
的快速完成调用,那么通过队列传递参数和 return 值的开销将成为总时间的重要部分。相反,如果您减少对pool.apply_async
的调用并在 return 获得结果之前给每个func
更多的工作要做,那么进程间通信将成为总时间的一小部分。下面,我修改了
_do_chunk_wrapper
以接受k_start
和k_end
参数,这样每次调用pool.apply_async
都会计算 [=39 的许多值的总和=] 在 return 结果之前。
import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm
class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = interpolate.RectBivariateSpline(
np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self, k, xi, yi):
n = self.n
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s
def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
for k in range(k_start, k_end))
sys.stderr.write(".")
return s
def do_multi(self, numproc, xi, yi):
procs = []
pool = mp.Pool(numproc)
stopwatch('\nPool setup')
ks = list(map(int, np.linspace(0, self.n, numproc+1)))
for i in range(len(ks)-1):
k_start, k_end = ks[i:i+2]
p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
total = 0.0
for k, p in enumerate(procs):
total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
print(total)
stopwatch('Jobs done')
pool.close()
pool.join()
return total
def do_single(self, xi, yi):
total = 0.0
for k in range(self.n):
total += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return total
def _do_chunk_wrapper(k_start, k_end, xi, yi):
return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)
产量
Initialized: 0.15 seconds
Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds
与原代码相比:
Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds
First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds
First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds