单例在多个进程中具有相同的地址,但表现得像单独的对象

Singleton has same address in multiple processes, but behaves like separate objects

我认为跨不同进程实例化单例会产生不同的对象,因此单例只会在其自己的进程中是本地的。为了验证这一点,我写了一个测试:

def v1():
    def keep_receiving():
        while True:
            print("from rec_proc: ", MessagePipe())
            print(MessagePipe().recv())

    def keep_sending():
        for i in range(3):
            print("from send_proc:", MessagePipe())
            MessagePipe().send(text=f"test {i + 1}")
            time.sleep(1)

    send_proc = Process(target=keep_sending)
    send_proc.start()

    rec_proc = Process(target=keep_receiving)
    rec_proc.start()

    send_proc.join()

我希望这两个进程打印出不同内存地址的对象。但事实并非如此:

输出:

from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from rec_proc:  <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>

如您所见,地址相同。

问题:如果这是两个不同的实例,为什么地址相同,每个实例都在自己的进程中?

它必须是两个不同的实例,否则 print(MessagePipe().recv()) 行应该打印出三个收到的消息。

当我像这样修改测试时,我得到了我期望的行为:

def v2():
    def keep_receiving(pipe):
        while True:
            print("from rec_proc: ", pipe)
            print(pipe.recv())

    def keep_sending(pipe):
        for i in range(3):
            print("from send_proc:", pipe)
            pipe.send(text=f"test {i + 1}")
            time.sleep(1)

    send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
    send_proc.start()

    rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
    rec_proc.start()

    send_proc.join()
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from rec_proc:  <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc:  <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc:  <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 3', 'exception': None, 'level': 20}
from rec_proc:  <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>

MessagePipeclass的实现方式如下,但我认为实现不是问题的核心。

import logging
from multiprocessing import Pipe


class MessagePipe:
    """This implements a multiprocessing message pipe as a singleton and is intended for passing event information
    such as log messaged and exceptions between processes.
    """

    _instance = None
    _parent_connection = None
    _child_connection = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
            cls._parent_connection, cls._child_connection = Pipe()
        return cls._instance

    @property
    def child_connection(self):
        return self._child_connection

    @property
    def parent_connection(self):
        return self._parent_connection

    def send(self, text="", exception=None, level=logging.INFO):
        message = {
            "text": text,
            "exception": exception,
            "level": level
        }
        self.child_connection.send(message)

    def recv(self):
        return self.parent_connection.recv()

在任何现代 OS 上,这些都是 虚拟内存 地址。这些地址是每个进程内存的本地地址 space。它们几乎从不对应于相同的物理内存单元格0x7f805e769610。您需要 google“虚拟内存”

现代OS使用虚拟内存。这意味着物理内存页被映射到进程地址space。因此,加载程序可以为同一程序的多个实例提供相同的虚拟地址。因此在每个进程中,单例将具有相同的虚拟地址,但是由于这些虚拟地址映射到不同的物理页面,它们将指向不同的对象。

有人可能能够更详细地解释这一点,但我认为问题是——我稍后会解释这个问题——你必须首先问参数是如何从您的主要流程到您的新子流程。好吧,这似乎取决于您 运行 在哪个平台下,您忽略了指定但我可以推断是一个使用 fork 来创建新进程的平台。我相信因为你的平台支持分叉,因此从主进程继承了地址space(read/only,写时复制),不需要pickle等特殊机制serialize/deserialize 参数和 就是地址相同的原因。

但问题在于:在 Windows 下,它不支持 fork 并将依赖 pickle 来传播 Process 参数, Message.__new__ 方法将在每个新创建的进程中重新执行。单例不仅会有自己唯一的地址(除非奇迹发生),它们还会分配自己的 multiprocessing.Pipe 个实例,然后,当然,应用程序将完全无法工作,如下所示。你最好只将 multiprocessing.Connection 对象传递给你的子进程。

from multiprocessing import Process, Pipe
import logging
import time

class MessagePipe:
    """This implements a multiprocessing message pipe as a singleton and is intended for passing event information
    such as log messaged and exceptions between processes.
    """

    _instance = None
    _parent_connection = None
    _child_connection = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            print('__new__ is executing')
            cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
            cls._parent_connection, cls._child_connection = Pipe()
        return cls._instance

    @property
    def child_connection(self):
        return self._child_connection

    @property
    def parent_connection(self):
        return self._parent_connection

    def send(self, text="", exception=None, level=logging.INFO):
        message = {
            "text": text,
            "exception": exception,
            "level": level
        }
        self.child_connection.send(message)

    def recv(self):
        return self.parent_connection.recv()

def keep_receiving(pipe):
    for i in range(3):
        print("from rec_proc: ", pipe)
        print(pipe.recv())

def keep_sending(pipe):
    for i in range(3):
        print("from send_proc:", pipe)
        pipe.send(text=f"test {i + 1}")
        time.sleep(1)

def v2():
    send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
    send_proc.start()

    rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
    rec_proc.start()

    send_proc.join()
    rec_proc.join()


if __name__ == '__main__':
    v2()

打印:

__new__ is executing
__new__ is executing
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
from rec_proc:  <__mp_main__.MessagePipe object at 0x00000249497A05E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>

rec_proc 挂起,因为它正在从一个没有写入的连接中读取数据。

通过为您的 MessagePipe class 指定 pickle __setstate____getstate__ 方法可以稍微缓解此问题。这不会阻止您的 __new__ 方法被调用并在子进程中创建新的 Pipe 实例,但它会用传递的序列化参数替换它们。

from multiprocessing import Process, Pipe
import logging
import time

class MessagePipe:
    """This implements a multiprocessing message pipe as a singleton and is intended for passing event information
    such as log messaged and exceptions between processes.
    """

    _instance = None
    _parent_connection = None
    _child_connection = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            print('__new__ is executing')
            cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
            cls._parent_connection, cls._child_connection = Pipe()
        return cls._instance

    @property
    def child_connection(self):
        return self._child_connection

    @property
    def parent_connection(self):
        return self._parent_connection

    def send(self, text="", exception=None, level=logging.INFO):
        message = {
            "text": text,
            "exception": exception,
            "level": level
        }
        self.child_connection.send(message)

    def recv(self):
        return self.parent_connection.recv()

    def __getstate__(self):
        return (self._parent_connection, self._child_connection)

    def __setstate__(self, state):
        self._parent_connection, self._child_connection = state

def keep_receiving(pipe):
    for i in range(3):
        print("from rec_proc: ", pipe)
        print(pipe.recv())

def keep_sending(pipe):
    for i in range(3):
        print("from send_proc:", pipe)
        pipe.send(text=f"test {i + 1}")
        time.sleep(1)

def v2():
    send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
    send_proc.start()

    rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
    rec_proc.start()

    send_proc.join()
    rec_proc.join()


if __name__ == '__main__':
    v2()

打印:

__new__ is executing
__new__ is executing
__new__ is executing
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
from rec_proc:  <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc:  <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc:  <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
{'text': 'test 3', 'exception': None, 'level': 20}

不使用方法 __new__ 单例 模式的另一个更 classic 实现避免创建新的 Pipe 实例及其相应的连接仅在 piclke __setstate__ 方法恢复原始序列化连接时丢弃它们:

from multiprocessing import Process, Pipe
import logging
import time

class MessagePipe:
    """This implements a multiprocessing message pipe as a singleton and is intended for passing event information
    such as log messaged and exceptions between processes.
    """

    _instance = None
    _parent_connection = None
    _child_connection = None

    def __init__(self):
        raise RuntimeError('Call getInstance() instead')

    @classmethod
    def getInstance(cls):
        if cls._instance is None:
            print('Creating new instance.')
            cls._instance = cls.__new__(cls)
            cls._parent_connection, cls._child_connection = Pipe()
        return cls._instance

    @property
    def child_connection(self):
        return self._child_connection

    @property
    def parent_connection(self):
        return self._parent_connection

    def send(self, text="", exception=None, level=logging.INFO):
        message = {
            "text": text,
            "exception": exception,
            "level": level
        }
        self.child_connection.send(message)

    def recv(self):
        return self.parent_connection.recv()

    def __getstate__(self):
        return (self._parent_connection, self._child_connection)

    def __setstate__(self, state):
        self._parent_connection, self._child_connection = state

def keep_receiving(pipe):
    for i in range(3):
        print("from rec_proc: ", pipe)
        print(pipe.recv())

def keep_sending(pipe):
    for i in range(3):
        print("from send_proc:", pipe)
        pipe.send(text=f"test {i + 1}")
        time.sleep(1)

def v2():
    send_proc = Process(target=keep_sending, args=(MessagePipe.getInstance(), ))
    send_proc.start()

    rec_proc = Process(target=keep_receiving, args=(MessagePipe.getInstance(), ))
    rec_proc.start()

    send_proc.join()
    rec_proc.join()


if __name__ == '__main__':
    v2()

打印:

Creating new instance.
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
from rec_proc:  <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc:  <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc:  <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
{'text': 'test 3', 'exception': None, 'level': 20}