Python 1 个核心上的 3 个多处理提供了随工作负载增加的开销
Python 3 multiprocessing on 1 core gives overhead that grows with workload
我正在测试 Python3 的并行功能,我打算在我的代码中使用它。我观察到出乎意料的缓慢行为,因此我将我的代码归结为以下原理证明。让我们计算一个简单的对数级数。让我们使用 1 个核心串行执行并并行执行。人们可以想象这两个示例的时间是相同的,除了与初始化和关闭 multiprocessing.Pool class 相关的小开销。然而,我观察到的是,开销随着问题的大小线性增长,因此即使对于大输入,1 核上的并行解决方案也比串行解决方案差得多。如果我做错了什么请告诉我
import time
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt
def foo(x):
return sum([np.log(1 + i*x) for i in range(10)])
def serial_series(rangeMax):
return [foo(x) for x in range(rangeMax)]
def parallel_series_1core(rangeMax):
pool = multiprocessing.Pool(processes=1)
rez = pool.map(foo, tuple(range(rangeMax)))
pool.terminate()
pool.join()
return rez
nTask = [1 + i ** 2 * 1000 for i in range(1, 2)]
nTimeSerial = []
nTimeParallel = []
for taskSize in nTask:
print('TaskSize', taskSize)
start = time.time()
rez = serial_series(taskSize)
end = time.time()
nTimeSerial.append(end - start)
start = time.time()
rez = parallel_series_1core(taskSize)
end = time.time()
nTimeParallel.append(end - start)
plt.plot(nTask, nTimeSerial)
plt.plot(nTask, nTimeParallel)
plt.legend(['serial', 'parallel 1 core'])
plt.show()
编辑:
有人评论说,开销可能是由于创建了多个工作。这是对并行函数的修改,它应该明确地只做 1 个工作。我仍然观察到开销的线性增长
def parallel_series_1core(rangeMax):
pool = multiprocessing.Pool(processes=1)
rez = pool.map(serial_series, [rangeMax])
pool.terminate()
pool.join()
return rez
编辑 2: 再一次,产生线性增长的确切代码。可以用serial_series函数里面的print语句测试,每次调用parallel_series_1core.
只调用一次
import time
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt
def foo(x):
return sum([np.log(1 + i*x) for i in range(10)])
def serial_series(rangeMax):
return [foo(i) for i in range(rangeMax)]
def parallel_series_1core(rangeMax):
pool = multiprocessing.Pool(processes=1)
rez = pool.map(serial_series, [rangeMax])
pool.terminate()
pool.join()
return rez
nTask = [1 + i ** 2 * 1000 for i in range(1, 20)]
nTimeSerial = []
nTimeParallel = []
for taskSize in nTask:
print('TaskSize', taskSize)
start = time.time()
rez1 = serial_series(taskSize)
end = time.time()
nTimeSerial.append(end - start)
start = time.time()
rez2 = parallel_series_1core(taskSize)
end = time.time()
nTimeParallel.append(end - start)
plt.plot(nTask, nTimeSerial)
plt.plot(nTask, nTimeParallel)
plt.plot(nTask, [i / j for i,j in zip(nTimeParallel, nTimeSerial)])
plt.legend(['serial', 'parallel 1 core', 'ratio'])
plt.show()
当您使用 Pool.map()
时,您实际上是在告诉它将传递的迭代拆分为 jobs 所有可用的子流程(在您的情况下是一个) - 可迭代对象越大,在第一次调用时创建的 'jobs' 越多。这就是最初增加的巨大开销(仅由进程创建本身胜过),尽管是线性开销。
由于子进程不共享内存,对于 POSIX 系统上的所有更改数据(由于分叉)和 Windows 上的所有数据(甚至是静态数据),它需要在一个系统上进行腌制结束并在另一个上解开它。此外,它需要时间来清除下一个作业的进程堆栈,此外还有系统线程切换的开销(这是你无法控制的,你必须扰乱系统的调度程序来减少开销)。
对于 simple/quick 任务,单进程总是胜过多进程。
UPDATE - 正如我上面所说的,额外的开销来自于进程之间的任何数据交换 Python transparently 执行 pickling/unpickling 例程。由于 serial_series()
函数中的 return 列表的大小会随着时间的推移而增加,因此 pickling/unpickling 的性能损失也会增加。这是基于您的代码的简单演示:
import math
import pickle
import sys
import time
# multi-platform precision timer
get_timer = time.clock if sys.platform == "win32" else time.time
def foo(x): # logic/computation function
return sum([math.log(1 + i*x) for i in range(10)])
def serial_series(max_range): # main sub-process function
return [foo(i) for i in range(max_range)]
def serial_series_slave(max_range): # subprocess interface
return pickle.dumps(serial_series(pickle.loads(max_range)))
def serial_series_master(max_range): # main process interface
return pickle.loads(serial_series_slave(pickle.dumps(max_range)))
tasks = [1 + i ** 2 * 1000 for i in range(1, 20)]
simulated_times = []
for task in tasks:
print("Simulated task size: {}".format(task))
start = get_timer()
res = serial_series_master(task)
simulated_times.append((task, get_timer() - start))
最后,simulated_times
将包含如下内容:
[(1001, 0.010015994115533963), (4001, 0.03402641167313844), (9001, 0.06755546622419131),
(16001, 0.1252664260421834), (25001, 0.18815836740279515), (36001, 0.28339434475444325),
(49001, 0.3757235840503601), (64001, 0.4813749807557435), (81001, 0.6115452710446636),
(100001, 0.7573718332506543), (121001, 0.9228750064147522), (144001, 1.0909038813527427),
(169001, 1.3017281342479343), (196001, 1.4830192955746764), (225001, 1.7117389965616931),
(256001, 1.9392146632682739), (289001, 2.19192682050668), (324001, 2.4497541011649187),
(361001, 2.7481495578097466)]
随着列表变大,处理时间增加明显大于线性。这就是多处理本质上发生的事情——如果你的子进程函数没有return任何东西,它最终会快得多。
如果您有大量数据需要在进程之间共享,我建议您使用一些内存数据库(如 Redis)并让您的子进程连接到它store/retrieve数据。
我正在测试 Python3 的并行功能,我打算在我的代码中使用它。我观察到出乎意料的缓慢行为,因此我将我的代码归结为以下原理证明。让我们计算一个简单的对数级数。让我们使用 1 个核心串行执行并并行执行。人们可以想象这两个示例的时间是相同的,除了与初始化和关闭 multiprocessing.Pool class 相关的小开销。然而,我观察到的是,开销随着问题的大小线性增长,因此即使对于大输入,1 核上的并行解决方案也比串行解决方案差得多。如果我做错了什么请告诉我
import time
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt
def foo(x):
return sum([np.log(1 + i*x) for i in range(10)])
def serial_series(rangeMax):
return [foo(x) for x in range(rangeMax)]
def parallel_series_1core(rangeMax):
pool = multiprocessing.Pool(processes=1)
rez = pool.map(foo, tuple(range(rangeMax)))
pool.terminate()
pool.join()
return rez
nTask = [1 + i ** 2 * 1000 for i in range(1, 2)]
nTimeSerial = []
nTimeParallel = []
for taskSize in nTask:
print('TaskSize', taskSize)
start = time.time()
rez = serial_series(taskSize)
end = time.time()
nTimeSerial.append(end - start)
start = time.time()
rez = parallel_series_1core(taskSize)
end = time.time()
nTimeParallel.append(end - start)
plt.plot(nTask, nTimeSerial)
plt.plot(nTask, nTimeParallel)
plt.legend(['serial', 'parallel 1 core'])
plt.show()
编辑: 有人评论说,开销可能是由于创建了多个工作。这是对并行函数的修改,它应该明确地只做 1 个工作。我仍然观察到开销的线性增长
def parallel_series_1core(rangeMax):
pool = multiprocessing.Pool(processes=1)
rez = pool.map(serial_series, [rangeMax])
pool.terminate()
pool.join()
return rez
编辑 2: 再一次,产生线性增长的确切代码。可以用serial_series函数里面的print语句测试,每次调用parallel_series_1core.
只调用一次import time
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt
def foo(x):
return sum([np.log(1 + i*x) for i in range(10)])
def serial_series(rangeMax):
return [foo(i) for i in range(rangeMax)]
def parallel_series_1core(rangeMax):
pool = multiprocessing.Pool(processes=1)
rez = pool.map(serial_series, [rangeMax])
pool.terminate()
pool.join()
return rez
nTask = [1 + i ** 2 * 1000 for i in range(1, 20)]
nTimeSerial = []
nTimeParallel = []
for taskSize in nTask:
print('TaskSize', taskSize)
start = time.time()
rez1 = serial_series(taskSize)
end = time.time()
nTimeSerial.append(end - start)
start = time.time()
rez2 = parallel_series_1core(taskSize)
end = time.time()
nTimeParallel.append(end - start)
plt.plot(nTask, nTimeSerial)
plt.plot(nTask, nTimeParallel)
plt.plot(nTask, [i / j for i,j in zip(nTimeParallel, nTimeSerial)])
plt.legend(['serial', 'parallel 1 core', 'ratio'])
plt.show()
当您使用 Pool.map()
时,您实际上是在告诉它将传递的迭代拆分为 jobs 所有可用的子流程(在您的情况下是一个) - 可迭代对象越大,在第一次调用时创建的 'jobs' 越多。这就是最初增加的巨大开销(仅由进程创建本身胜过),尽管是线性开销。
由于子进程不共享内存,对于 POSIX 系统上的所有更改数据(由于分叉)和 Windows 上的所有数据(甚至是静态数据),它需要在一个系统上进行腌制结束并在另一个上解开它。此外,它需要时间来清除下一个作业的进程堆栈,此外还有系统线程切换的开销(这是你无法控制的,你必须扰乱系统的调度程序来减少开销)。
对于 simple/quick 任务,单进程总是胜过多进程。
UPDATE - 正如我上面所说的,额外的开销来自于进程之间的任何数据交换 Python transparently 执行 pickling/unpickling 例程。由于 serial_series()
函数中的 return 列表的大小会随着时间的推移而增加,因此 pickling/unpickling 的性能损失也会增加。这是基于您的代码的简单演示:
import math
import pickle
import sys
import time
# multi-platform precision timer
get_timer = time.clock if sys.platform == "win32" else time.time
def foo(x): # logic/computation function
return sum([math.log(1 + i*x) for i in range(10)])
def serial_series(max_range): # main sub-process function
return [foo(i) for i in range(max_range)]
def serial_series_slave(max_range): # subprocess interface
return pickle.dumps(serial_series(pickle.loads(max_range)))
def serial_series_master(max_range): # main process interface
return pickle.loads(serial_series_slave(pickle.dumps(max_range)))
tasks = [1 + i ** 2 * 1000 for i in range(1, 20)]
simulated_times = []
for task in tasks:
print("Simulated task size: {}".format(task))
start = get_timer()
res = serial_series_master(task)
simulated_times.append((task, get_timer() - start))
最后,simulated_times
将包含如下内容:
[(1001, 0.010015994115533963), (4001, 0.03402641167313844), (9001, 0.06755546622419131), (16001, 0.1252664260421834), (25001, 0.18815836740279515), (36001, 0.28339434475444325), (49001, 0.3757235840503601), (64001, 0.4813749807557435), (81001, 0.6115452710446636), (100001, 0.7573718332506543), (121001, 0.9228750064147522), (144001, 1.0909038813527427), (169001, 1.3017281342479343), (196001, 1.4830192955746764), (225001, 1.7117389965616931), (256001, 1.9392146632682739), (289001, 2.19192682050668), (324001, 2.4497541011649187), (361001, 2.7481495578097466)]
随着列表变大,处理时间增加明显大于线性。这就是多处理本质上发生的事情——如果你的子进程函数没有return任何东西,它最终会快得多。
如果您有大量数据需要在进程之间共享,我建议您使用一些内存数据库(如 Redis)并让您的子进程连接到它store/retrieve数据。