在 Python 中使用泳池
Use of pools in Python
我对使用 Python 进行多处理还很陌生,我正在尝试了解如何正确使用 Pool。我有一些代码如下所示:
import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
...
pool = ThreadPool(15)
arb = np.arange(0,len(np.concatenate((P,V),axis=0)),1)
F = pool.map(tttt,arb)
pool.close()
pool.join()
NT = 1000
for test in range(0,NT):
(P,V) = Dynamics(P,V,F)
pool = ThreadPool(15)
F = pool.map(tttt,arb)
pool.close()
pool.join()
...
tttt和Dynamics是之前定义的两个函数。我想使用 Pool 能够使用 tttt 同时计算很多值,但我也想更新我用于这些计算的值(tttt 取决于 P 和 V,尽管不是明确的)。
我是否必须像现在这样创建和关闭池两次,还是只需要一次?
您不必实例化多个池。在调用 pool.close()
之前,您可以多次调用 pool.map(...)
。只有当您没有更多任务要提交到池中时才调用 pool.close()
。
需要注意的是 pool.map(...)
是一个阻塞调用 - 它不会 return 直到所有提交到池的任务都完成。这对您的目的来说可能效率低下 - 当池正在工作时,您可能想要执行后台工作,例如 collecting/submitting 更多任务。您可以改为使用 pool.map_async(...)
,但您的代码会变得有点复杂。
旁注:我会小心你的命名。 multiprocessing.Pool 不是线程池。它是一个子进程池。线程池和进程池是不同的野兽,有自己的考虑。
事实上,您可能会发现,如果您的处理大部分都在 numpy 中(C 扩展,GIL 已发布,即使在繁重的情况下也不必担心解释器级别的争用,您实际上需要一个自制的线程池处理、线程 = 更少的 IPC 开销和系统资源需求等)。
简单回答
您似乎想在 for
循环的每次迭代中使用进程池。您已经使事情变得比使用 Pool.map
所需的更复杂,但是您对 .join()
和 .close()
的调用表明您宁愿使用 Pool.map_async
。这是一个简单的例子:
import numpy as np
from multiprocessing import Pool
from time import sleep
def print_square(x):
sleep(.01)
print x**2
if __name__=='__main__':
for k in range(10):
pool = Pool(3)
arb = np.arange(0,10)
pool.map_async(print_square,arb)
pool.close()
pool.join()
一般说明
您通常应该包含一个 minimal, complete, verifiable example。你的例子不能是 运行。更糟糕的是,它包含许多无关的特定于域的代码(例如 P
、V
、Dynamics
),这会阻止其他人尝试 运行 您的示例。
说明观察到的代码行为(例如错误输出、运行 时间错误等)和期望的行为。
将 Pool
导入为 ThreadPool
令人困惑,因为线程和进程是 different,但具有非常相似的 API。
我对使用 Python 进行多处理还很陌生,我正在尝试了解如何正确使用 Pool。我有一些代码如下所示:
import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
...
pool = ThreadPool(15)
arb = np.arange(0,len(np.concatenate((P,V),axis=0)),1)
F = pool.map(tttt,arb)
pool.close()
pool.join()
NT = 1000
for test in range(0,NT):
(P,V) = Dynamics(P,V,F)
pool = ThreadPool(15)
F = pool.map(tttt,arb)
pool.close()
pool.join()
...
tttt和Dynamics是之前定义的两个函数。我想使用 Pool 能够使用 tttt 同时计算很多值,但我也想更新我用于这些计算的值(tttt 取决于 P 和 V,尽管不是明确的)。
我是否必须像现在这样创建和关闭池两次,还是只需要一次?
您不必实例化多个池。在调用 pool.close()
之前,您可以多次调用 pool.map(...)
。只有当您没有更多任务要提交到池中时才调用 pool.close()
。
需要注意的是 pool.map(...)
是一个阻塞调用 - 它不会 return 直到所有提交到池的任务都完成。这对您的目的来说可能效率低下 - 当池正在工作时,您可能想要执行后台工作,例如 collecting/submitting 更多任务。您可以改为使用 pool.map_async(...)
,但您的代码会变得有点复杂。
旁注:我会小心你的命名。 multiprocessing.Pool 不是线程池。它是一个子进程池。线程池和进程池是不同的野兽,有自己的考虑。
事实上,您可能会发现,如果您的处理大部分都在 numpy 中(C 扩展,GIL 已发布,即使在繁重的情况下也不必担心解释器级别的争用,您实际上需要一个自制的线程池处理、线程 = 更少的 IPC 开销和系统资源需求等)。
简单回答
您似乎想在 for
循环的每次迭代中使用进程池。您已经使事情变得比使用 Pool.map
所需的更复杂,但是您对 .join()
和 .close()
的调用表明您宁愿使用 Pool.map_async
。这是一个简单的例子:
import numpy as np
from multiprocessing import Pool
from time import sleep
def print_square(x):
sleep(.01)
print x**2
if __name__=='__main__':
for k in range(10):
pool = Pool(3)
arb = np.arange(0,10)
pool.map_async(print_square,arb)
pool.close()
pool.join()
一般说明
您通常应该包含一个 minimal, complete, verifiable example。你的例子不能是 运行。更糟糕的是,它包含许多无关的特定于域的代码(例如
P
、V
、Dynamics
),这会阻止其他人尝试 运行 您的示例。说明观察到的代码行为(例如错误输出、运行 时间错误等)和期望的行为。
将
Pool
导入为ThreadPool
令人困惑,因为线程和进程是 different,但具有非常相似的 API。