在 Python 中使用泳池

Use of pools in Python

我对使用 Python 进行多处理还很陌生,我正在尝试了解如何正确使用 Pool。我有一些代码如下所示:

import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
... 


pool = ThreadPool(15)

arb = np.arange(0,len(np.concatenate((P,V),axis=0)),1)

F = pool.map(tttt,arb)

pool.close()
pool.join()

NT = 1000

for test in range(0,NT):
    (P,V) = Dynamics(P,V,F)

    pool = ThreadPool(15)

    F = pool.map(tttt,arb)

    pool.close()
    pool.join() 

...

tttt和Dynamics是之前定义的两个函数。我想使用 Pool 能够使用 tttt 同时计算很多值,但我也想更新我用于这些计算的值(tttt 取决于 P 和 V,尽管不是明确的)。

我是否必须像现在这样创建和关闭池两次,还是只需要一次?

您不必实例化多个池。在调用 pool.close() 之前,您可以多次调用 pool.map(...)。只有当您没有更多任务要提交到池中时才调用 pool.close()

需要注意的是 pool.map(...) 是一个阻塞调用 - 它不会 return 直到所有提交到池的任务都完成。这对您的目的来说可能效率低下 - 当池正在工作时,您可能想要执行后台工作,例如 collecting/submitting 更多任务。您可以改为使用 pool.map_async(...),但您的代码会变得有点复杂。

旁注:我会小心你的命名。 multiprocessing.Pool 不是线程池。它是一个子进程池。线程池和进程池是不同的野兽,有自己的考虑。

事实上,您可能会发现,如果您的处理大部分都在 numpy 中(C 扩展,GIL 已发布,即使在繁重的情况下也不必担心解释器级别的争用,您实际上需要一个自制的线程池处理、线程 = 更少的 IPC 开销和系统资源需求等)。

简单回答

您似乎想在 for 循环的每次迭代中使用进程池。您已经使事情变得比使用 Pool.map 所需的更复杂,但是您对 .join().close() 的调用表明您宁愿使用 Pool.map_async。这是一个简单的例子:

import numpy as np
from multiprocessing import Pool
from time import sleep

def print_square(x):
    sleep(.01)
    print x**2

if __name__=='__main__':

    for k in range(10):
        pool = Pool(3)
        arb = np.arange(0,10)
        pool.map_async(print_square,arb)
        pool.close()
        pool.join()

一般说明

  1. 您通常应该包含一个 minimal, complete, verifiable example。你的例子不能是 运行。更糟糕的是,它包含许多无关的特定于域的代码(例如 PVDynamics),这会阻止其他人尝试 运行 您的示例。

  2. 说明观察到的代码行为(例如错误输出、运行 时间错误等)和期望的行为。

  3. Pool 导入为 ThreadPool 令人困惑,因为线程和进程是 different,但具有非常相似的 API。