Python 1 个核心上的 3 个多处理提供了随工作负载增加的开销

Python 3 multiprocessing on 1 core gives overhead that grows with workload

我正在测试 Python3 的并行功能,我打算在我的代码中使用它。我观察到出乎意料的缓慢行为,因此我将我的代码归结为以下原理证明。让我们计算一个简单的对数级数。让我们使用 1 个核心串行执行并并行执行。人们可以想象这两个示例的时间是相同的,除了与初始化和关闭 multiprocessing.Pool class 相关的小开销。然而,我观察到的是,开销随着问题的大小线性增长,因此即使对于大输入,1 核上的并行解决方案也比串行解决方案差得多。如果我做错了什么请告诉我

import time
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt


def foo(x):
    return sum([np.log(1 + i*x) for i in range(10)])


def serial_series(rangeMax):
    return [foo(x) for x in range(rangeMax)]

def parallel_series_1core(rangeMax):
    pool = multiprocessing.Pool(processes=1)
    rez = pool.map(foo, tuple(range(rangeMax)))

    pool.terminate()
    pool.join()

    return rez


nTask = [1 + i ** 2 * 1000 for i in range(1, 2)]
nTimeSerial = []
nTimeParallel = []

for taskSize in nTask:
    print('TaskSize', taskSize)
    start = time.time()
    rez = serial_series(taskSize)
    end = time.time()
    nTimeSerial.append(end - start)

    start = time.time()
    rez = parallel_series_1core(taskSize)
    end = time.time()
    nTimeParallel.append(end - start)


plt.plot(nTask, nTimeSerial)
plt.plot(nTask, nTimeParallel)

plt.legend(['serial', 'parallel 1 core'])
plt.show()

编辑: 有人评论说,开销可能是由于创建了多个工作。这是对并行函数的修改,它应该明确地只做 1 个工作。我仍然观察到开销的线性增长

def parallel_series_1core(rangeMax):
    pool = multiprocessing.Pool(processes=1)
    rez = pool.map(serial_series, [rangeMax])

    pool.terminate()
    pool.join()

    return rez

编辑 2: 再一次,产生线性增长的确切代码。可以用serial_series函数里面的print语句测试,每次调用parallel_series_1core.

只调用一次
import time
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt

def foo(x):
    return sum([np.log(1 + i*x) for i in range(10)])

def serial_series(rangeMax):
    return [foo(i) for i in range(rangeMax)]

def parallel_series_1core(rangeMax):
    pool = multiprocessing.Pool(processes=1)
    rez = pool.map(serial_series, [rangeMax])

    pool.terminate()
    pool.join()

    return rez


nTask = [1 + i ** 2 * 1000 for i in range(1, 20)]
nTimeSerial = []
nTimeParallel = []

for taskSize in nTask:
    print('TaskSize', taskSize)
    start = time.time()
    rez1 = serial_series(taskSize)
    end = time.time()
    nTimeSerial.append(end - start)

    start = time.time()
    rez2 = parallel_series_1core(taskSize)
    end = time.time()
    nTimeParallel.append(end - start)


plt.plot(nTask, nTimeSerial)
plt.plot(nTask, nTimeParallel)
plt.plot(nTask, [i / j for i,j in zip(nTimeParallel, nTimeSerial)])

plt.legend(['serial', 'parallel 1 core', 'ratio'])
plt.show()

当您使用 Pool.map() 时,您实际上是在告诉它将传递的迭代拆分为 jobs 所有可用的子流程(在您的情况下是一个) - 可迭代对象越大,在第一次调用时创建的 'jobs' 越多。这就是最初增加的巨大开销(仅由进程创建本身胜过),尽管是线性开销。

由于子进程不共享内存,对于 POSIX 系统上的所有更改数据(由于分叉)和 Windows 上的所有数据(甚至是静态数据),它需要在一个系统上进行腌制结束并在另一个上解开它。此外,它需要时间来清除下一个作业的进程堆栈,此外还有系统线程切换的开销(这是你无法控制的,你必须扰乱系统的调度程序来减少开销)。

对于 simple/quick 任务,单进程总是胜过多进程。

UPDATE - 正如我上面所说的,额外的开销来自于进程之间的任何数据交换 Python transparently 执行 pickling/unpickling 例程。由于 serial_series() 函数中的 return 列表的大小会随着时间的推移而增加,因此 pickling/unpickling 的性能损失也会增加。这是基于您的代码的简单演示:

import math
import pickle
import sys
import time

# multi-platform precision timer
get_timer = time.clock if sys.platform == "win32" else time.time

def foo(x):  # logic/computation function
    return sum([math.log(1 + i*x) for i in range(10)])

def serial_series(max_range):  # main sub-process function
    return [foo(i) for i in range(max_range)]

def serial_series_slave(max_range):  # subprocess interface
    return pickle.dumps(serial_series(pickle.loads(max_range)))

def serial_series_master(max_range):  # main process interface
    return pickle.loads(serial_series_slave(pickle.dumps(max_range)))

tasks = [1 + i ** 2 * 1000 for i in range(1, 20)]
simulated_times = []
for task in tasks:
    print("Simulated task size: {}".format(task))
    start = get_timer()
    res = serial_series_master(task)
    simulated_times.append((task, get_timer() - start))

最后,simulated_times 将包含如下内容:

[(1001, 0.010015994115533963), (4001, 0.03402641167313844), (9001, 0.06755546622419131),
 (16001, 0.1252664260421834), (25001, 0.18815836740279515), (36001, 0.28339434475444325),
 (49001, 0.3757235840503601), (64001, 0.4813749807557435), (81001, 0.6115452710446636),
 (100001, 0.7573718332506543), (121001, 0.9228750064147522), (144001, 1.0909038813527427),
 (169001, 1.3017281342479343), (196001, 1.4830192955746764), (225001, 1.7117389965616931),
 (256001, 1.9392146632682739), (289001, 2.19192682050668), (324001, 2.4497541011649187),
 (361001, 2.7481495578097466)]

随着列表变大,处理时间增加明显大于线性。这就是多处理本质上发生的事情——如果你的子进程函数没有return任何东西,它最终会快得多。

如果您有大量数据需要在进程之间共享,我建议您使用一些内存数据库(如 Redis)并让您的子进程连接到它store/retrieve数据。