Python:多处理的非常奇怪的行为;后面的代码导致 "retroactive" 前面的代码变慢

Python: Very strange behavior with multiprocessing; later code causes "retroactive" slowdown of earlier code

我正在尝试学习如何为计算 Monte Carlo 模拟实现多处理。我从 this simple tutorial where the aim is to compute an integral. I also compare it to the answer from WolframAlpha 复制了代码并计算了错误。我的代码的第一部分没有问题,只是用来定义积分函数和声明一些常量:

import numpy as np
import multiprocessing as mp
import time

def integrate(iterations):
    np.random.seed()
    mc_sum = 0
    chunks = 10000
    chunk_size = int(iterations/chunks)

    for i in range(chunks):
        u = np.random.uniform(size=chunk_size)
        mc_sum += np.sum(np.exp(-u * u))

    normed = mc_sum / iterations
    return normed

wolfram_answer = 0.746824132812427
mc_iterations = 1000000000

但是在接下来的两部分中会发生一些非常恐怖的事情(我给它们贴上了标签,因为它很重要)。首先(标记为 "BLOCK 1"),我在没有任何多处理的情况下进行模拟,只是为了获得一个基准。在此之后(标记为 "BLOCK 2"),我做同样的事情,但有一个多处理步骤。如果你要重现这个,你可能需要根据你的机器有多少核心来调整 num_procs 变量:

#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer

print(mc_iterations, "iterations on single-thread:",
      single_duration, "seconds.")
print("Estimation error:", error_single)
print("")

#### BLOCK 2
if __name__ == "__main__":
    num_procs = 8
    multi_iterations = int(mc_iterations / num_procs)

    multi_before = time.time()
    pool = mp.Pool(processes = num_procs)

    multi_result = pool.map(integrate, [multi_iterations]*num_procs)
    multi_result = np.array(multi_result).mean()
    multi_after = time.time()

    multi_duration = np.round(multi_after - multi_before, 3)
    error_multi = (wolfram_answer - multi_result)/wolfram_answer

    print(num_procs, "threads with", multi_iterations, "iterations each:",
          multi_duration, "seconds.")
    print("Estimation error:", error_multi)

输出为:

1000000000 iterations on single-thread: 37.448 seconds.
Estimation error: 1.17978774235e-05

8 threads with 125000000 iterations each: 54.697 seconds.
Estimation error: -5.88380936901e-06

因此,多处理速度较慢。这并非闻所未闻;也许来自多处理的开销只是超过并行化的收益?

但是,这不是正在发生的事情。观察当我只是注释掉第一个块时会发生什么:

#### BLOCK 1
##single_before = time.time()
##single = integrate(mc_iterations)
##single_after = time.time()
##single_duration = np.round(single_after - single_before, 3)
##error_single = (wolfram_answer - single)/wolfram_answer
##
##print(mc_iterations, "iterations on single-thread:",
##      single_duration, "seconds.")
##print("Estimation error:", error_single)
##print("")

#### BLOCK 2
if __name__ == "__main__":
    num_procs = 8
    multi_iterations = int(mc_iterations / num_procs)

    multi_before = time.time()
    pool = mp.Pool(processes = num_procs)

    multi_result = pool.map(integrate, [multi_iterations]*num_procs)
    multi_result = np.array(multi_result).mean()
    multi_after = time.time()

    multi_duration = np.round(multi_after - multi_before, 3)
    error_multi = (wolfram_answer - multi_result)/wolfram_answer

    print(num_procs, "threads with", multi_iterations, "iterations each:",
          multi_duration, "seconds.")
    print("Estimation error:", error_multi)

输出为:

8 threads with 125000000 iterations each: 6.662 seconds.
Estimation error: 3.86063069069e-06

没错——完成多处理的时间从 55 秒减少到不到 7 秒!这还不是最奇怪的部分。观察当我 将 Block 1 移动到 Block 2 之后会发生什么 :

#### BLOCK 2
if __name__ == "__main__":
    num_procs = 8
    multi_iterations = int(mc_iterations / num_procs)

    multi_before = time.time()
    pool = mp.Pool(processes = num_procs)

    multi_result = pool.map(integrate, [multi_iterations]*num_procs)
    multi_result = np.array(multi_result).mean()
    multi_after = time.time()

    multi_duration = np.round(multi_after - multi_before, 3)
    error_multi = (wolfram_answer - multi_result)/wolfram_answer

    print(num_procs, "threads with", multi_iterations, "iterations each:",
          multi_duration, "seconds.")
    print("Estimation error:", error_multi)

#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer

print(mc_iterations, "iterations on single-thread:",
      single_duration, "seconds.")
print("Estimation error:", error_single)
print("")

输出为:

8 threads with 125000000 iterations each: 54.938 seconds.
Estimation error: 7.42415402896e-06
1000000000 iterations on single-thread: 37.396 seconds.
Estimation error: 9.79800494235e-06

我们又回到了慢输出,这完全疯了!Python难道不应该被解释吗?我知道该声明有一百个警告,但我理所当然地认为代码是逐行执行的,因此之后出现的东西(函数之外,类 等)不会影响这些东西从以前开始,因为还没有"looked at"。

那么,多处理步骤结束后,如何追溯减慢多处理代码?

最后,仅通过将块 1 缩进到 if __name__ == "__main__" 块内来恢复快速行为,因为它当然会:

#### BLOCK 2
if __name__ == "__main__":
    num_procs = 8
    multi_iterations = int(mc_iterations / num_procs)

    multi_before = time.time()
    pool = mp.Pool(processes = num_procs)

    multi_result = pool.map(integrate, [multi_iterations]*num_procs)
    multi_result = np.array(multi_result).mean()
    multi_after = time.time()

    multi_duration = np.round(multi_after - multi_before, 3)
    error_multi = (wolfram_answer - multi_result)/wolfram_answer

    print(num_procs, "threads with", multi_iterations, "iterations each:",
          multi_duration, "seconds.")
    print("Estimation error:", error_multi)

    #### BLOCK 1
    single_before = time.time()
    single = integrate(mc_iterations)
    single_after = time.time()
    single_duration = np.round(single_after - single_before, 3)
    error_single = (wolfram_answer - single)/wolfram_answer

    print(mc_iterations, "iterations on single-thread:",
          single_duration, "seconds.")
    print("Estimation error:", error_single)
    print("")

输出为:

8 threads with 125000000 iterations each: 7.293 seconds.
Estimation error: 1.10350027622e-05
1000000000 iterations on single-thread: 31.035 seconds.
Estimation error: 2.53582945763e-05

如果将块 1 保留在 if 块内,但将其移动到定义 num_procs 的上方(此处未显示,因为这个问题已经很长了,因此也会恢复快速行为).

那么,到底是什么导致了这种行为?我猜这是与线程和进程分支有关的某种竞争条件,但从我的专业水平来看也可能是我的 Python 口译员闹鬼了。

这是因为您正在使用 Windows。在 Windows 上,每个子进程都是使用 'spawn' method 生成的,它本质上是启动一个新的 python 解释器并导入您的模块,而不是分叉进程。

这是个问题,因为if __name__ == '__main__'外面的所有代码都重新执行了。如果将多处理代码放在顶层,这可能会导致 multiprocessing bomb,因为它会开始生成进程,直到 运行 内存不足。

这实际上是warned about in the docs

Safe importing of main module

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

...

Instead one should protect the “entry point” of the program by using if __name__ == '__main__'

...

This allows the newly spawned Python interpreter to safely import the module...

该部分在 Python 2 的旧文档中曾被称为 "Windows"。

添加一些细节,在 Windows 上,模块在每个工作进程中导入 "from scratch"。这意味着模块中的 一切 都由每个 worker 执行。因此,在您的第一个示例中,每个工作进程 first 执行 "BLOCK 1".

但是您的输出没有反映出这一点。你应该得到一行输出,如

1000000000 iterations on single-thread: 37.448 seconds.

来自您的 8 个工作进程中的每一个。但是你的输出没有显示。也许您正在使用 IDE 来抑制衍生进程的输出?如果你 运行 它在 "DOS box" (cmd.exe window) 而不是,那不会抑制输出,并且可以使发生的事情更清楚。