python 中带池的多处理:关于同时具有相同名称的多个实例

Multiprocessing with pool in python: About several instances with same name at the same time

我对多处理有点陌生。但是,假设我们有如下程序。该程序似乎运行良好。现在的问题。在我看来,我们将同时拥有 4 个同名 (a) 的 SomeKindOfClass 实例。这怎么可能?此外,这种编程是否存在潜在风险?

from multiprocessing.dummy import Pool
import numpy
from theFile import someKindOfClass

n = 8 
allOutputs = numpy.zeros(n)

def work(index):   
    a = SomeKindOfClass()
    a.theSlowFunction()
    allOutputs[index] = a.output

pool = Pool(processes=4) 
pool.map(work,range(0,n))

名称 a 仅在您的 work 函数的局部范围内,因此此处不存在名称冲突。在内部 python 将使用唯一标识符跟踪每个 class 实例。如果你想检查这个,你可以使用 id 函数检查对象 ID:

print(id(a))

我没有发现您的代码有任何问题。

您可以在池工作器中声明 class 个实例,因为每个实例在内存中都有一个单独的位置,因此它们不会发生冲突。问题是,如果您先声明一个 class 实例,然后尝试将该实例传递给多个池工作者。然后每个worker都有指向内存中同一个地方的指针,就会失败(这个可以处理,只是不能这样)

基本上池工作者不能在任何地方有重叠的内存。只要工作人员不尝试在某处共享内存,或执行可能导致冲突的操作(如打印到同一文件),就不会有任何问题。

确保它们应该做的任何事情(比如你想打印到文件中的东西,或者添加到某个更广泛的命名空间中的东西)在最后作为结果返回,然后你迭代。

如果您正在使用多处理,您不必担心 - 进程不共享内存(默认情况下)。因此,拥有 class SomeKindOfClass 的多个 independent 对象没有任何风险 - 每个对象都将存在于自己的进程中。怎么运行的? Python 运行您的程序,然后运行 ​​4 个子进程。这就是为什么在 pool.map(work,range(0,n)) 之前进行 if __init__ == '__main__' 构建非常重要的原因。否则,您将收到进程创建的无限循环。

问题可能是 SomeKindOfClass 将状态保存在磁盘上 - 例如,将某些内容写入文件或读取它。

实际上,您将有 8SomeKindOfClass 实例(每个工人一个),但只有 4 个会同时处于活动状态。

multiprocessing 对比 multiprocessing.dummy

只有继续使用 multiprocessing.dummy 模块,您的程序才能运行,它只是 threading 模块的包装器。您仍在使用 "python threads"(不是单独的进程)。 "Python threads" 共享相同的全局状态; "Processes" 不要。 Python 线程也共享相同的 GIL,因此它们仍然限于一次 运行ning 一个 python 字节码语句,这与进程不同,进程可以全部 运行 python同时编码。

如果您将导入更改为 from multiprocessing import Pool,您会注意到 allOutputs 数组在所有工作程序完成执行后保持不变(另外,您可能会收到错误,因为您在全局范围内创建池,您应该将其放在 main() 函数中)。这是因为 multiprocessing 在创建新进程时会创建整个全局状态的新副本。当 worker 修改全局 allOutputs 时,它将修改初始全局状态的 copy。当进程结束时,不会向主进程返回任何内容,主进程的全局状态将保持不变。

在进程之间共享状态

与线程不同,进程不共享相同的内存

如果你想在进程之间共享状态,你必须显式声明共享变量并将它们传递给每个进程,或者使用管道或其他一些方法允许工作进程相互通信或与主进程通信.

有几种方法可以做到这一点,但也许最简单的方法是使用 Manager class

import multiprocessing

def worker(args):
    index, array = args
    a = SomeKindOfClass()
    a.some_expensive_function()
    array[index] = a.output


def main():
    n = 8
    manager = multiprocessing.Manager()
    array = manager.list([0] * n)
    pool = multiprocessing.Pool(4)
    pool.map(worker, [(i, array) for i in range(n)])
    print array