多处理代码 运行 比单线程代码慢得多

Multiprocessing code running much slower than single-threaded code

我正在尝试学习如何使用 Python 中的 multiprocessing 包,并且我编写了以下代码,它随机生成一个大型二维数组,然后计算出每行中有多少个数字在指定的时间间隔内(在本例中为 4 到 8):

import time
import multiprocessing as mp
import numpy as np

def how_many_within_range(row, minimum, maximum):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count += 1
    return count

if __name__ == '__main__':
    data = np.random.randint(0, 10, size=[10000000, 5])
    print(data[:5])

    start_time = time.perf_counter()

    # With parallelisation
    with mp.Pool(mp.cpu_count()) as pool:
        results = [ pool.apply(how_many_within_range, args=(row, 4, 8)) \
                       for row in data ]

    # Without parallelisation
    # results = [ how_many_within_range(row, 4, 8) for row in data ]

    print(f'Time elapsed: {time.perf_counter() - start_time}')
    print(results[:5])

如果没有多处理,代码运行大约需要 40 秒,但是有了它,程序会慢很多,并且无法在现实的时间内完成。我很确定我已经正确地遵循了我使用的教程,那么我做错了什么?

documentation 看来 Pool.apply() 正在阻塞,因此您获得了启动进程但未获得并行性的开销。

为什么需要在如此简单的函数中使用多处理,甚至使用 numpy 数组? 尝试使用此代码

%%timeit
np.sum((data>=4)&(data<=8), axis=1)
198 ms ± 3.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

for循环中不需要遍历数组元素 并立即执行

.apply() 是这种情况下的错误函数。 .starmap() 更合适,但对于这种简单的情况,启动进程和在进程间传输大量数据的开销使它整体变慢。

import time
import multiprocessing as mp
import numpy as np

def how_many_within_range(row, minimum, maximum):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count += 1
    return count

if __name__ == '__main__':
    data = np.random.randint(0, 10, size=[1000000, 5])
    print(data[:5])

    # With parallelisation
    start_time = time.perf_counter()
    with mp.Pool() as pool:
        results = pool.starmap(how_many_within_range, ((row,4,8) for row in data), chunksize=1000)
    print(f'Time elapsed: {time.perf_counter() - start_time}')
    print(results[:5])

    # Without parallelisation
    start_time = time.perf_counter()
    results = [ how_many_within_range(row, 4, 8) for row in data ]
    print(f'Time elapsed: {time.perf_counter() - start_time}')
    print(results[:5])

输出:

[[1 4 8 9 2]
 [9 1 6 7 0]
 [0 7 6 8 4]
 [4 5 6 9 9]
 [6 6 9 9 1]]
Time elapsed: 3.3232607
[2, 2, 4, 3, 2]
Time elapsed: 2.4664016999999996
[2, 2, 4, 3, 2]