Python:多处理的非常奇怪的行为;后面的代码导致 "retroactive" 前面的代码变慢
Python: Very strange behavior with multiprocessing; later code causes "retroactive" slowdown of earlier code
我正在尝试学习如何为计算 Monte Carlo 模拟实现多处理。我从 this simple tutorial where the aim is to compute an integral. I also compare it to the answer from WolframAlpha 复制了代码并计算了错误。我的代码的第一部分没有问题,只是用来定义积分函数和声明一些常量:
import numpy as np
import multiprocessing as mp
import time
def integrate(iterations):
np.random.seed()
mc_sum = 0
chunks = 10000
chunk_size = int(iterations/chunks)
for i in range(chunks):
u = np.random.uniform(size=chunk_size)
mc_sum += np.sum(np.exp(-u * u))
normed = mc_sum / iterations
return normed
wolfram_answer = 0.746824132812427
mc_iterations = 1000000000
但是在接下来的两部分中会发生一些非常恐怖的事情(我给它们贴上了标签,因为它很重要)。首先(标记为 "BLOCK 1"),我在没有任何多处理的情况下进行模拟,只是为了获得一个基准。在此之后(标记为 "BLOCK 2"),我做同样的事情,但有一个多处理步骤。如果你要重现这个,你可能需要根据你的机器有多少核心来调整 num_procs
变量:
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
输出为:
1000000000 iterations on single-thread: 37.448 seconds.
Estimation error: 1.17978774235e-05
8 threads with 125000000 iterations each: 54.697 seconds.
Estimation error: -5.88380936901e-06
因此,多处理速度较慢。这并非闻所未闻;也许来自多处理的开销只是超过并行化的收益?
但是,这不是正在发生的事情。观察当我只是注释掉第一个块时会发生什么:
#### BLOCK 1
##single_before = time.time()
##single = integrate(mc_iterations)
##single_after = time.time()
##single_duration = np.round(single_after - single_before, 3)
##error_single = (wolfram_answer - single)/wolfram_answer
##
##print(mc_iterations, "iterations on single-thread:",
## single_duration, "seconds.")
##print("Estimation error:", error_single)
##print("")
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
输出为:
8 threads with 125000000 iterations each: 6.662 seconds.
Estimation error: 3.86063069069e-06
没错——完成多处理的时间从 55 秒减少到不到 7 秒!这还不是最奇怪的部分。观察当我 将 Block 1 移动到 Block 2 之后会发生什么 :
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
输出为:
8 threads with 125000000 iterations each: 54.938 seconds.
Estimation error: 7.42415402896e-06
1000000000 iterations on single-thread: 37.396 seconds.
Estimation error: 9.79800494235e-06
我们又回到了慢输出,这完全疯了!Python难道不应该被解释吗?我知道该声明有一百个警告,但我理所当然地认为代码是逐行执行的,因此之后出现的东西(函数之外,类 等)不会影响这些东西从以前开始,因为还没有"looked at"。
那么,在多处理步骤结束后,如何追溯减慢多处理代码?
最后,仅通过将块 1 缩进到 if __name__ == "__main__"
块内来恢复快速行为,因为它当然会:
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
输出为:
8 threads with 125000000 iterations each: 7.293 seconds.
Estimation error: 1.10350027622e-05
1000000000 iterations on single-thread: 31.035 seconds.
Estimation error: 2.53582945763e-05
如果将块 1 保留在 if
块内,但将其移动到定义 num_procs
的上方(此处未显示,因为这个问题已经很长了,因此也会恢复快速行为).
那么,到底是什么导致了这种行为?我猜这是与线程和进程分支有关的某种竞争条件,但从我的专业水平来看也可能是我的 Python 口译员闹鬼了。
这是因为您正在使用 Windows。在 Windows 上,每个子进程都是使用 'spawn'
method 生成的,它本质上是启动一个新的 python 解释器并导入您的模块,而不是分叉进程。
这是个问题,因为if __name__ == '__main__'
外面的所有代码都重新执行了。如果将多处理代码放在顶层,这可能会导致 multiprocessing bomb,因为它会开始生成进程,直到 运行 内存不足。
Safe importing of main module
Make sure that the main module can be safely imported by a new Python
interpreter without causing unintended side effects (such a starting a
new process).
...
Instead one should protect the “entry point” of the program by using
if __name__ == '__main__'
...
This allows the newly spawned Python interpreter to safely import the
module...
该部分在 Python 2 的旧文档中曾被称为 "Windows"。
添加一些细节,在 Windows 上,模块在每个工作进程中导入 "from scratch"。这意味着模块中的 一切 都由每个 worker 执行。因此,在您的第一个示例中,每个工作进程 first 执行 "BLOCK 1".
但是您的输出没有反映出这一点。你应该得到一行输出,如
1000000000 iterations on single-thread: 37.448 seconds.
来自您的 8 个工作进程中的每一个。但是你的输出没有显示。也许您正在使用 IDE 来抑制衍生进程的输出?如果你 运行 它在 "DOS box" (cmd.exe
window) 而不是,那不会抑制输出,并且可以使发生的事情更清楚。
我正在尝试学习如何为计算 Monte Carlo 模拟实现多处理。我从 this simple tutorial where the aim is to compute an integral. I also compare it to the answer from WolframAlpha 复制了代码并计算了错误。我的代码的第一部分没有问题,只是用来定义积分函数和声明一些常量:
import numpy as np
import multiprocessing as mp
import time
def integrate(iterations):
np.random.seed()
mc_sum = 0
chunks = 10000
chunk_size = int(iterations/chunks)
for i in range(chunks):
u = np.random.uniform(size=chunk_size)
mc_sum += np.sum(np.exp(-u * u))
normed = mc_sum / iterations
return normed
wolfram_answer = 0.746824132812427
mc_iterations = 1000000000
但是在接下来的两部分中会发生一些非常恐怖的事情(我给它们贴上了标签,因为它很重要)。首先(标记为 "BLOCK 1"),我在没有任何多处理的情况下进行模拟,只是为了获得一个基准。在此之后(标记为 "BLOCK 2"),我做同样的事情,但有一个多处理步骤。如果你要重现这个,你可能需要根据你的机器有多少核心来调整 num_procs
变量:
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
输出为:
1000000000 iterations on single-thread: 37.448 seconds.
Estimation error: 1.17978774235e-05
8 threads with 125000000 iterations each: 54.697 seconds.
Estimation error: -5.88380936901e-06
因此,多处理速度较慢。这并非闻所未闻;也许来自多处理的开销只是超过并行化的收益?
但是,这不是正在发生的事情。观察当我只是注释掉第一个块时会发生什么:
#### BLOCK 1
##single_before = time.time()
##single = integrate(mc_iterations)
##single_after = time.time()
##single_duration = np.round(single_after - single_before, 3)
##error_single = (wolfram_answer - single)/wolfram_answer
##
##print(mc_iterations, "iterations on single-thread:",
## single_duration, "seconds.")
##print("Estimation error:", error_single)
##print("")
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
输出为:
8 threads with 125000000 iterations each: 6.662 seconds.
Estimation error: 3.86063069069e-06
没错——完成多处理的时间从 55 秒减少到不到 7 秒!这还不是最奇怪的部分。观察当我 将 Block 1 移动到 Block 2 之后会发生什么 :
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
输出为:
8 threads with 125000000 iterations each: 54.938 seconds.
Estimation error: 7.42415402896e-06
1000000000 iterations on single-thread: 37.396 seconds.
Estimation error: 9.79800494235e-06
我们又回到了慢输出,这完全疯了!Python难道不应该被解释吗?我知道该声明有一百个警告,但我理所当然地认为代码是逐行执行的,因此之后出现的东西(函数之外,类 等)不会影响这些东西从以前开始,因为还没有"looked at"。
那么,在多处理步骤结束后,如何追溯减慢多处理代码?
最后,仅通过将块 1 缩进到 if __name__ == "__main__"
块内来恢复快速行为,因为它当然会:
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
输出为:
8 threads with 125000000 iterations each: 7.293 seconds.
Estimation error: 1.10350027622e-05
1000000000 iterations on single-thread: 31.035 seconds.
Estimation error: 2.53582945763e-05
如果将块 1 保留在 if
块内,但将其移动到定义 num_procs
的上方(此处未显示,因为这个问题已经很长了,因此也会恢复快速行为).
那么,到底是什么导致了这种行为?我猜这是与线程和进程分支有关的某种竞争条件,但从我的专业水平来看也可能是我的 Python 口译员闹鬼了。
这是因为您正在使用 Windows。在 Windows 上,每个子进程都是使用 'spawn'
method 生成的,它本质上是启动一个新的 python 解释器并导入您的模块,而不是分叉进程。
这是个问题,因为if __name__ == '__main__'
外面的所有代码都重新执行了。如果将多处理代码放在顶层,这可能会导致 multiprocessing bomb,因为它会开始生成进程,直到 运行 内存不足。
Safe importing of main module
Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).
...
Instead one should protect the “entry point” of the program by using
if __name__ == '__main__'
...
This allows the newly spawned Python interpreter to safely import the module...
该部分在 Python 2 的旧文档中曾被称为 "Windows"。
添加一些细节,在 Windows 上,模块在每个工作进程中导入 "from scratch"。这意味着模块中的 一切 都由每个 worker 执行。因此,在您的第一个示例中,每个工作进程 first 执行 "BLOCK 1".
但是您的输出没有反映出这一点。你应该得到一行输出,如
1000000000 iterations on single-thread: 37.448 seconds.
来自您的 8 个工作进程中的每一个。但是你的输出没有显示。也许您正在使用 IDE 来抑制衍生进程的输出?如果你 运行 它在 "DOS box" (cmd.exe
window) 而不是,那不会抑制输出,并且可以使发生的事情更清楚。