多处理代码 运行 比单线程代码慢得多
Multiprocessing code running much slower than single-threaded code
我正在尝试学习如何使用 Python 中的 multiprocessing 包,并且我编写了以下代码,它随机生成一个大型二维数组,然后计算出每行中有多少个数字在指定的时间间隔内(在本例中为 4 到 8):
import time
import multiprocessing as mp
import numpy as np
def how_many_within_range(row, minimum, maximum):
count = 0
for n in row:
if minimum <= n <= maximum:
count += 1
return count
if __name__ == '__main__':
data = np.random.randint(0, 10, size=[10000000, 5])
print(data[:5])
start_time = time.perf_counter()
# With parallelisation
with mp.Pool(mp.cpu_count()) as pool:
results = [ pool.apply(how_many_within_range, args=(row, 4, 8)) \
for row in data ]
# Without parallelisation
# results = [ how_many_within_range(row, 4, 8) for row in data ]
print(f'Time elapsed: {time.perf_counter() - start_time}')
print(results[:5])
如果没有多处理,代码运行大约需要 40 秒,但是有了它,程序会慢很多,并且无法在现实的时间内完成。我很确定我已经正确地遵循了我使用的教程,那么我做错了什么?
从 documentation 看来 Pool.apply() 正在阻塞,因此您获得了启动进程但未获得并行性的开销。
为什么需要在如此简单的函数中使用多处理,甚至使用 numpy 数组?
尝试使用此代码
%%timeit
np.sum((data>=4)&(data<=8), axis=1)
198 ms ± 3.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
for循环中不需要遍历数组元素
并立即执行
.apply()
是这种情况下的错误函数。 .starmap()
更合适,但对于这种简单的情况,启动进程和在进程间传输大量数据的开销使它整体变慢。
import time
import multiprocessing as mp
import numpy as np
def how_many_within_range(row, minimum, maximum):
count = 0
for n in row:
if minimum <= n <= maximum:
count += 1
return count
if __name__ == '__main__':
data = np.random.randint(0, 10, size=[1000000, 5])
print(data[:5])
# With parallelisation
start_time = time.perf_counter()
with mp.Pool() as pool:
results = pool.starmap(how_many_within_range, ((row,4,8) for row in data), chunksize=1000)
print(f'Time elapsed: {time.perf_counter() - start_time}')
print(results[:5])
# Without parallelisation
start_time = time.perf_counter()
results = [ how_many_within_range(row, 4, 8) for row in data ]
print(f'Time elapsed: {time.perf_counter() - start_time}')
print(results[:5])
输出:
[[1 4 8 9 2]
[9 1 6 7 0]
[0 7 6 8 4]
[4 5 6 9 9]
[6 6 9 9 1]]
Time elapsed: 3.3232607
[2, 2, 4, 3, 2]
Time elapsed: 2.4664016999999996
[2, 2, 4, 3, 2]
我正在尝试学习如何使用 Python 中的 multiprocessing 包,并且我编写了以下代码,它随机生成一个大型二维数组,然后计算出每行中有多少个数字在指定的时间间隔内(在本例中为 4 到 8):
import time
import multiprocessing as mp
import numpy as np
def how_many_within_range(row, minimum, maximum):
count = 0
for n in row:
if minimum <= n <= maximum:
count += 1
return count
if __name__ == '__main__':
data = np.random.randint(0, 10, size=[10000000, 5])
print(data[:5])
start_time = time.perf_counter()
# With parallelisation
with mp.Pool(mp.cpu_count()) as pool:
results = [ pool.apply(how_many_within_range, args=(row, 4, 8)) \
for row in data ]
# Without parallelisation
# results = [ how_many_within_range(row, 4, 8) for row in data ]
print(f'Time elapsed: {time.perf_counter() - start_time}')
print(results[:5])
如果没有多处理,代码运行大约需要 40 秒,但是有了它,程序会慢很多,并且无法在现实的时间内完成。我很确定我已经正确地遵循了我使用的教程,那么我做错了什么?
从 documentation 看来 Pool.apply() 正在阻塞,因此您获得了启动进程但未获得并行性的开销。
为什么需要在如此简单的函数中使用多处理,甚至使用 numpy 数组? 尝试使用此代码
%%timeit
np.sum((data>=4)&(data<=8), axis=1)
198 ms ± 3.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
for循环中不需要遍历数组元素 并立即执行
.apply()
是这种情况下的错误函数。 .starmap()
更合适,但对于这种简单的情况,启动进程和在进程间传输大量数据的开销使它整体变慢。
import time
import multiprocessing as mp
import numpy as np
def how_many_within_range(row, minimum, maximum):
count = 0
for n in row:
if minimum <= n <= maximum:
count += 1
return count
if __name__ == '__main__':
data = np.random.randint(0, 10, size=[1000000, 5])
print(data[:5])
# With parallelisation
start_time = time.perf_counter()
with mp.Pool() as pool:
results = pool.starmap(how_many_within_range, ((row,4,8) for row in data), chunksize=1000)
print(f'Time elapsed: {time.perf_counter() - start_time}')
print(results[:5])
# Without parallelisation
start_time = time.perf_counter()
results = [ how_many_within_range(row, 4, 8) for row in data ]
print(f'Time elapsed: {time.perf_counter() - start_time}')
print(results[:5])
输出:
[[1 4 8 9 2]
[9 1 6 7 0]
[0 7 6 8 4]
[4 5 6 9 9]
[6 6 9 9 1]]
Time elapsed: 3.3232607
[2, 2, 4, 3, 2]
Time elapsed: 2.4664016999999996
[2, 2, 4, 3, 2]