Python 多处理为每个进程创建相同的对象实例

Python multiprocessing making same object instance for every process

我已经写了一个简单的例子来说明我到底在做什么。可能有一些非常简单的解释我只是想念。

import time
import multiprocessing as mp
import os


class SomeOtherClass:
    def __init__(self):
        self.a = 'b'


class SomeProcessor(mp.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        soc = SomeOtherClass()
        print("PID: ", os.getpid())
        print(soc)

if __name__ == "__main__":
    queue = mp.Queue()

    for n in range(10):
        queue.put(n)

    processes = []

    for proc in range(mp.cpu_count()):
        p = SomeProcessor(queue)
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

结果是:

PID: 11853
<__main__.SomeOtherClass object at 0x7fa637d3f588>
PID: 11854
<__main__.SomeOtherClass object at 0x7fa637d3f588>
PID: 11855
<__main__.SomeOtherClass object at 0x7fa637d3f588>
PID: 11856
<__main__.SomeOtherClass object at 0x7fa637d3f588>

无论每次初始化都发生在新进程中,所有对象地址都是相同的。 谁能指出问题所在。谢谢。

我也想知道这种行为,当我首先在主进程中初始化同一个对象,然后在其上缓存一些值,然后在每个进程中初始化同一个对象。然后进程继承主进程对象。

import time
import multiprocessing as mp
import os
import random

class SomeOtherClass:

    c = {}

    def get(self, a):
        if a in self.c:
            print('Retrieved cached value ...')
            return self.c[a]

        b = random.randint(1,999)

        self.c[a] = b

        return b


class SomeProcessor(mp.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        pid = os.getpid()
        soc = SomeOtherClass()
        val = soc.get('new')
        print("Value from process {0} is {1}".format(pid, val))

if __name__ == "__main__":
    queue = mp.Queue()

    for n in range(10):
        queue.put(n)

    pid = os.getpid()
    soc = SomeOtherClass()
    val = soc.get('new')
    print("Value from main process {0} is {1}".format(pid, val))

    processes = []

    for proc in range(mp.cpu_count()):
        p = SomeProcessor(queue)
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

这里的输出是:

Value from main process 13052 is 676
Retrieved cached value ...
Value from process 13054 is 676
Retrieved cached value ...
Value from process 13056 is 676
Retrieved cached value ...
Value from process 13057 is 676
Retrieved cached value ...
Value from process 13055 is 676

查看显示每个 SomeOtherClass 不同的修改代码。

import time
import multiprocessing as mp
import os


class SomeOtherClass:

  def __new__(cls, *args, **kwargs):
        print('-- inside __new__ --')
        return super(SomeOtherClass, cls).__new__(cls, *args, **kwargs)


    def __init__(self):
        self.a = os.getpid()
    def __str__(self):
        return f'{self.a}'


class SomeProcessor(mp.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        soc = SomeOtherClass()
        print("PID: ", os.getpid())
        print(soc)

if __name__ == "__main__":
    queue = mp.Queue()

    for n in range(10):
        queue.put(n)

    processes = []

    for proc in range(mp.cpu_count()):
        p = SomeProcessor(queue)
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

输出

 -- inside __new__ --
PID:  25054
25054
-- inside __new__ --
PID:  25055
25055
-- inside __new__ --
PID:  25056
25056
-- inside __new__ --
PID:  25057
25057
-- inside __new__ --
PID:  25058
25058
-- inside __new__ --
PID:  25059
25059
-- inside __new__ --
PID:  25060
25060
-- inside __new__ --
PID:  25061
25061

tldr:它们实际上不是同一个实例,所以不用担心。

这很有趣。它们的内存引用完全一样,但实例肯定不同。如果我们这样修改代码:

import time
import multiprocessing as mp
import os


class SomeOtherClass:
    def __init__(self, num):
        self.a = num  # <-- Let's identify the instance with the pid
    
    def __str__(self):
        return f"I'm number {self.a}"  # <-- Better representation of the instance


class SomeProcessor(mp.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        soc = SomeOtherClass(os.getpid())  <-- Use the PID to instantiate different objects
        print("PID: ", os.getpid())
        print(soc)
        time.sleep(1)
        print(soc)  # <-- Give it a second and print again

if __name__ == "__main__":
    queue = mp.Queue()

    for n in range(10):
        queue.put(n)

    processes = []

    for proc in range(mp.cpu_count()):
        p = SomeProcessor(queue)
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

我们可以看到实例肯定是不一样的,它们并没有被修改,因为在time.sleep()之后它们的属性仍然没有改变:

PID:  668424
I'm number 668424
PID:  668425
I'm number 668425
PID:  668426
I'm number 668426
...
I'm number 668435
I'm number 668424
I'm number 668426
...

然而,如果我们删除 __str__ 函数,我仍然看到相同的内存引用:

<__main__.SomeOtherClass object at 0x7f3e08d83bb0>
PID:  669008
<__main__.SomeOtherClass object at 0x7f3e08d83bb0>
PID:  669009
<__main__.SomeOtherClass object at 0x7f3e08d83bb0>
PID:  669010
...
<__main__.SomeOtherClass object at 0x7f3e08d83bb0>
<__main__.SomeOtherClass object at 0x7f3e08d83bb0>
<__main__.SomeOtherClass object at 0x7f3e08d83bb0>
...

老实说,我真的不知道为什么会这样,所以其他人可以帮助你更多。正如用户 Booboo 所说,您看到这一点是因为 Linux 使用 fork 来启动一个新进程。我也在 Linux 机器上做了 运行 这个。如果使用 Windows,内存引用将不同。

展开评论和讨论:

  • 在 Linux 上,multiprocessing 默认为 fork 启动方法。分叉进程意味着 child 进程将共享 copy-on-write 版本的 parent 进程数据。这就是为什么全局创建的 object 在子进程中具有相同地址的原因。
    • 在 macOS 和 Windows 上,默认启动方法是 spawn – 在这种情况下不共享 object。
  • 子进程将在写入 object 时立即拥有它们的唯一副本(事实上,在 CPython 内部,当它们甚至 读取 它们,由于引用计数器在 object header).
  • 一个变量定义为
    class SomeClass:
        container = {}
    
    是 class-level,而不是 instance-level,并且将在 SomeClass 的所有实例之间共享。那是,
    a = SomeClass()
    b = SomeClass()
    print(a is b)  # False
    print(a.container is b.container is SomeClass.container)  # True
    a.container["x"] = True
    print("x" in b.container)  # True
    print("x" in SomeClass.container)  # True
    
    由于 class 的状态被分叉到子进程中,共享的 container 似乎也是共享的。但是,在子进程中写入容器将 不会 出现在 parent 或兄弟进程中。只有某些特殊的 multiprocessing 类型(和某些 lower-level 原语)可以跨越进程边界。
  • 要正确区分实例和进程之间的 container,需要 instance-level:
    class SomeClass:
        def __init__(self):
            self.container = {}
    
    (但是,当然,如果一个 SomeClass 被全局实例化,并且一个进程被分叉,它在分叉时的状态将在子进程中可用。)