为什么这个多处理代码比串行代码慢?你会如何做得更好?

Why is this multiprocessing code slower than serial? How would you do this better?

我显然在 multiprocessing 上做错了什么,但我不确定是什么 -- 我希望看到这项任务的速度有所加快,但是 运行 这花费的时间分叉进程中的测试函数比主进程中花费的时间多 2 个数量级。这是一项不平凡的任务,所以我不认为这是工作负载太小而无法从多处理中受益的情况,如 this question 和基本上所有其他关于 multiprocessing 的 SO 问题。而且我知道启动新进程会产生开销,但是我的函数 returns 花费在实际计算上的时间,我认为这会在分叉开销完成后发生。

我查看了一堆文档和示例,尝试使用 mapmap_asyncapplyapply_async 而不是 imap_unordered,但我在所有情况下都得到了可比较的结果。我在这一点上完全迷惑不解......任何帮助理解我做错了什么都会很棒,就像修改后的代码片段一样,它提供了一个示例,说明如何通过并行化此任务来获得性能提升。谢谢!

import time

t_start = time.time()
from multiprocessing import Pool
from numpy.random import rand, randint
import numpy.linalg as la
import numpy as np


def f(sp):
    (M, i) = sp
    t0 = time.time()
    M = (M @ M.T) / 1000 + np.eye(M.shape[0])
    M_inv = la.inv(M)
    t_elapsed = time.time() - t0
    return i, M.shape[0], la.det(M_inv), t_elapsed


randmat = lambda m: rand(m, m)

N = 20
n_m = 1500
specs = list(zip([randmat(n_m) for _ in range(N)], range(N)))

t0 = time.time()
for result in [f(sp) for sp in specs]:
    print(result)

print(f"\n--- serial time: {time.time()-t0}; total elapsed: {time.time()-t_start }\n")

t0 = time.time()
with Pool(processes=10) as pool:
    multiple_results = pool.imap_unordered(f, specs)
    for result in multiple_results:
        print(result)

print(f"\n--- parallel time: {time.time()-t0}\n")

输出:

(0, 1500, 2.613708465497732e-76, 0.17858004570007324)
(1, 1500, 2.3314319199405457e-76, 0.18518280982971191)
(2, 1500, 2.4510533542449015e-76, 0.18424344062805176)
...(snip)...
(17, 1500, 2.0972534465354807e-76, 0.18465876579284668)
(18, 1500, 2.4890185099760677e-76, 0.18526124954223633)
(19, 1500, 3.0716539033944427e-76, 0.17455506324768066)

--- serial time: 5.365333557128906; total elapsed: 5.747828006744385

(0, 1500, 2.613708465497732e-76, 9.31627368927002)
(1, 1500, 2.3314319199405457e-76, 9.709473848342896)
(5, 1500, 2.6716145027956763e-76, 10.101540327072144)
...(snip)...
(19, 1500, 3.0716539033944427e-76, 10.48097825050354)
(18, 1500, 2.4890185099760677e-76, 10.82164478302002)
(17, 1500, 2.0972534465354807e-76, 10.97563886642456)

--- parallel time: 40.98197340965271

(系统信息:Mint 20.1,AMD Ryzen 5 2600(6 核,12 线程),Python 3.8)

更新

我相信下面@Paul 的回答可能是正确的。进一步研究这个问题,我想出了一个更病态的测试用例来解决@Charles Duffy 对序列化成本高昂的担忧——在下面的例子中,我只向每个进程发送一个 100 元素的 numpy 向量,而内部每个函数调用的时间从 ~0.03 秒变为 大约 100 秒 !!差了三个数量级以上!疯子!我只能想象在 multiprocessing 无法处理的后台发生了某种关于 CPU 访问的灾难性争用。

...但这似乎也是一个问题,特别是与 multiprocessingnumpy 之间的交互有关,因为我尝试了 ray,我得到了那种我期望通过并行化实现性能提升。

新结果 tl;dr

代码 v2

import time

t_start = time.time()

import multiprocessing as mp
from numpy.random import randn
import numpy.linalg as la
import numpy as np
import ray


num_vecs = 20
vec_size = 100
inputs = [(randn(vec_size, 1), i, t_start) for i in range(num_vecs)]


def f(input):
    (v, i, t_start) = input
    t0 = time.time()
    det_sum = 0
    M = (v @ v.T) + np.diag(v[:, 0])
    for _ in range(50):
        M = M @ (M.T)
        M = M @ (la.inv(M + np.eye(M.shape[0])) / 2)
        det_sum += la.det(M)
    t_inner = time.time() - t0
    t_since_start = time.time() - t_start
    return i, det_sum, t_inner, t_since_start


def print_result(r):
    print(
        f"id: {r[0]:2}, det_sum: {r[1]:.3e}, inner time: {r[2]:.4f}, time since start: {r[3]:.4f}"
    )


t0 = time.time()
for result in [f(sp) for sp in inputs]:
    print_result(result)
print(f"\n--- serial time: {time.time()-t0}; total elapsed: {time.time()-t_start }\n")

ray.init(num_cpus=10)
g = ray.remote(f)
t0 = time.time()
results = ray.get([g.remote(s) for s in inputs])
for result in results:
    print(result)
print(f"\n--- parallel time: {time.time()-t0}\n")

t0 = time.time()
with mp.Pool(processes=10) as pool:
    multiple_results = pool.imap_unordered(f, inputs)
    for result in multiple_results:
        print(result)
print(f"\n--- parallel time: {time.time()-t0}\n")

输出v2

id:  0, det_sum: 1.427e-133, inner time: 2.8998, time since start: 3.1620
id:  1, det_sum: 3.294e-118, inner time: 0.3816, time since start: 3.5436
id:  2, det_sum: 2.729e-114, inner time: 0.0569, time since start: 3.6005
...(snip)...
id: 17, det_sum: 2.372e-104, inner time: 0.0344, time since start: 4.8887
id: 18, det_sum: 3.523e-116, inner time: 0.0509, time since start: 4.9396
id: 19, det_sum: 9.242e-101, inner time: 0.0549, time since start: 4.9945

--- serial time: 4.734628677368164; total elapsed: 4.996868848800659

id:  0, det_sum: 1.427e-133, inner time: 0.0436, time since start: 6.1446
id:  1, det_sum: 3.294e-118, inner time: 0.0465, time since start: 6.1541
id:  2, det_sum: 2.729e-114, inner time: 0.0436, time since start: 6.1517
...(snip)...
id: 17, det_sum: 2.372e-104, inner time: 0.0438, time since start: 6.2027
id: 18, det_sum: 3.523e-116, inner time: 0.0394, time since start: 6.1995
id: 19, det_sum: 9.242e-101, inner time: 0.0413, time since start: 6.2032

--- parallel time: 0.1118767261505127

id:  0, det_sum: 1.427e-133, inner time: 101.0206, time since start: 107.2395
id:  2, det_sum: 2.729e-114, inner time: 102.6551, time since start: 108.8744
id:  5, det_sum: 2.063e-111, inner time: 104.2321, time since start: 110.4516
...(snip)...
id: 18, det_sum: 3.523e-116, inner time: 102.0273, time since start: 223.5556
id: 16, det_sum: 5.887e-99, inner time: 102.9106, time since start: 223.5907
id: 19, det_sum: 9.242e-101, inner time: 101.1289, time since start: 223.6742

--- parallel time: 217.47953820228577

我相信您的 numpy 很可能 已经在单进程模型中利用您的多核架构 。例如,从 here:

But many architectures now have a BLAS that also takes advantage of a multicore machine. If your numpy/scipy is compiled using one of these, then dot() will be computed in parallel (if this is faster) without you doing anything. Similarly for other matrix operations, like inversion, singular value decomposition, determinant, and so on.

您可以检查一下:

>>> import numpy as np
>>> np.show_config()

而且,作为一个简单的测试,如果你增加矩阵的大小并且只是 运行 它直接你看到你的多个核心被 numpy 使用了吗?例如,在 运行ning:

时观看 top
>>> n_m = 20000
>>> M = np.random.rand(n_m, n_m)
>>> M = (M @ M.T) / 1000 + np.eye(M.shape[0])

这可能足够慢,您可以在一个进程中查看它是否已经在使用多个内核。

正如您想象的那样,如果它已经在这样做,那么将它分成不同的进程只会增加开销,因此速度会变慢。