为什么concurrent.futures.ProcessPoolExecutor的性能很低?
Why the performance of concurrent.futures.ProcessPoolExecutor is very low?
我正在尝试利用 Python3
中的 concurrent.futures.ProcessPoolExecutor
并行处理大型矩阵。代码的大致结构是:
class X(object):
self.matrix
def f(self, i, row_i):
<cpu-bound process>
def fetch_multiple(self, ids):
with ProcessPoolExecutor() as executor:
futures = [executor.submit(self.f, i, self.matrix.getrow(i)) for i in ids]
return [f.result() for f in as_completed(futures)]
self.matrix
是一个大scipy csr_matrix。 f
是我的并发函数,它接受一行 self.matrix
并在其上应用 CPU-bound 过程。最后,fetch_multiple
是 运行 并行 f
的多个实例和 returns 结果的函数。
问题是在 运行 执行脚本后,所有 cpu 核心都不到 50% 忙(请参见以下屏幕截图):
为什么所有核心都不忙?
我认为问题是 self.matrix
的大对象和在进程之间传递行向量。我该如何解决这个问题?
是的。
开销不应该那么大 - 但它可能是你的 CPU 出现空闲的原因(尽管它们应该忙于传递数据)。
但是尝试使用此处的方法将对象的 "pointer" 传递给使用共享内存的子进程。
http://briansimulator.org/sharing-numpy-arrays-between-processes/
从那里引用:
from multiprocessing import sharedctypes
size = S.size
shape = S.shape
S.shape = size
S_ctypes = sharedctypes.RawArray('d', S)
S = numpy.frombuffer(S_ctypes, dtype=numpy.float64, count=size)
S.shape = shape
Now we can send S_ctypes and shape to a child process in
multiprocessing, and convert it back to a numpy array in the child
process as follows:
from numpy import ctypeslib
S = ctypeslib.as_array(S_ctypes)
S.shape = shape
处理引用计数应该很棘手,但我想 numpy.ctypeslib
会处理这个 - 所以,只需以他们不这样做的方式协调将实际行号传递给子进程不处理相同的数据
我正在尝试利用 Python3
中的 concurrent.futures.ProcessPoolExecutor
并行处理大型矩阵。代码的大致结构是:
class X(object):
self.matrix
def f(self, i, row_i):
<cpu-bound process>
def fetch_multiple(self, ids):
with ProcessPoolExecutor() as executor:
futures = [executor.submit(self.f, i, self.matrix.getrow(i)) for i in ids]
return [f.result() for f in as_completed(futures)]
self.matrix
是一个大scipy csr_matrix。 f
是我的并发函数,它接受一行 self.matrix
并在其上应用 CPU-bound 过程。最后,fetch_multiple
是 运行 并行 f
的多个实例和 returns 结果的函数。
问题是在 运行 执行脚本后,所有 cpu 核心都不到 50% 忙(请参见以下屏幕截图):
为什么所有核心都不忙?
我认为问题是 self.matrix
的大对象和在进程之间传递行向量。我该如何解决这个问题?
是的。 开销不应该那么大 - 但它可能是你的 CPU 出现空闲的原因(尽管它们应该忙于传递数据)。
但是尝试使用此处的方法将对象的 "pointer" 传递给使用共享内存的子进程。
http://briansimulator.org/sharing-numpy-arrays-between-processes/
从那里引用:
from multiprocessing import sharedctypes
size = S.size
shape = S.shape
S.shape = size
S_ctypes = sharedctypes.RawArray('d', S)
S = numpy.frombuffer(S_ctypes, dtype=numpy.float64, count=size)
S.shape = shape
Now we can send S_ctypes and shape to a child process in multiprocessing, and convert it back to a numpy array in the child process as follows:
from numpy import ctypeslib
S = ctypeslib.as_array(S_ctypes)
S.shape = shape
处理引用计数应该很棘手,但我想 numpy.ctypeslib
会处理这个 - 所以,只需以他们不这样做的方式协调将实际行号传递给子进程不处理相同的数据