单例在多个进程中具有相同的地址,但表现得像单独的对象
Singleton has same address in multiple processes, but behaves like separate objects
我认为跨不同进程实例化单例会产生不同的对象,因此单例只会在其自己的进程中是本地的。为了验证这一点,我写了一个测试:
def v1():
def keep_receiving():
while True:
print("from rec_proc: ", MessagePipe())
print(MessagePipe().recv())
def keep_sending():
for i in range(3):
print("from send_proc:", MessagePipe())
MessagePipe().send(text=f"test {i + 1}")
time.sleep(1)
send_proc = Process(target=keep_sending)
send_proc.start()
rec_proc = Process(target=keep_receiving)
rec_proc.start()
send_proc.join()
我希望这两个进程打印出不同内存地址的对象。但事实并非如此:
输出:
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
如您所见,地址相同。
问题:如果这是两个不同的实例,为什么地址相同,每个实例都在自己的进程中?
它必须是两个不同的实例,否则 print(MessagePipe().recv())
行应该打印出三个收到的消息。
当我像这样修改测试时,我得到了我期望的行为:
def v2():
def keep_receiving(pipe):
while True:
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
rec_proc.start()
send_proc.join()
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 3', 'exception': None, 'level': 20}
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
MessagePipe
class的实现方式如下,但我认为实现不是问题的核心。
import logging
from multiprocessing import Pipe
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
在任何现代 OS 上,这些都是 虚拟内存 地址。这些地址是每个进程内存的本地地址 space。它们几乎从不对应于相同的物理内存单元格0x7f805e769610
。您需要 google“虚拟内存”
现代OS使用虚拟内存。这意味着物理内存页被映射到进程地址space。因此,加载程序可以为同一程序的多个实例提供相同的虚拟地址。因此在每个进程中,单例将具有相同的虚拟地址,但是由于这些虚拟地址映射到不同的物理页面,它们将指向不同的对象。
有人可能能够更详细地解释这一点,但我认为问题是——我稍后会解释这个问题——你必须首先问参数是如何从您的主要流程到您的新子流程。好吧,这似乎取决于您 运行 在哪个平台下,您忽略了指定但我可以推断是一个使用 fork 来创建新进程的平台。我相信因为你的平台支持分叉,因此从主进程继承了地址space(read/only,写时复制),不需要pickle
等特殊机制serialize/deserialize 参数和 那 就是地址相同的原因。
但问题在于:在 Windows 下,它不支持 fork 并将依赖 pickle
来传播 Process
参数, Message.__new__
方法将在每个新创建的进程中重新执行。单例不仅会有自己唯一的地址(除非奇迹发生),它们还会分配自己的 multiprocessing.Pipe
个实例,然后,当然,应用程序将完全无法工作,如下所示。你最好只将 multiprocessing.Connection
对象传递给你的子进程。
from multiprocessing import Process, Pipe
import logging
import time
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
print('__new__ is executing')
cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
def keep_receiving(pipe):
for i in range(3):
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
def v2():
send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
rec_proc.start()
send_proc.join()
rec_proc.join()
if __name__ == '__main__':
v2()
打印:
__new__ is executing
__new__ is executing
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
from rec_proc: <__mp_main__.MessagePipe object at 0x00000249497A05E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
rec_proc
挂起,因为它正在从一个没有写入的连接中读取数据。
通过为您的 MessagePipe
class 指定 pickle
__setstate__
和 __getstate__
方法可以稍微缓解此问题。这不会阻止您的 __new__
方法被调用并在子进程中创建新的 Pipe
实例,但它会用传递的序列化参数替换它们。
from multiprocessing import Process, Pipe
import logging
import time
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
print('__new__ is executing')
cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
def __getstate__(self):
return (self._parent_connection, self._child_connection)
def __setstate__(self, state):
self._parent_connection, self._child_connection = state
def keep_receiving(pipe):
for i in range(3):
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
def v2():
send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
rec_proc.start()
send_proc.join()
rec_proc.join()
if __name__ == '__main__':
v2()
打印:
__new__ is executing
__new__ is executing
__new__ is executing
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
from rec_proc: <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
{'text': 'test 3', 'exception': None, 'level': 20}
不使用方法 __new__
的 单例 模式的另一个更 classic 实现避免创建新的 Pipe
实例及其相应的连接仅在 piclke
__setstate__
方法恢复原始序列化连接时丢弃它们:
from multiprocessing import Process, Pipe
import logging
import time
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __init__(self):
raise RuntimeError('Call getInstance() instead')
@classmethod
def getInstance(cls):
if cls._instance is None:
print('Creating new instance.')
cls._instance = cls.__new__(cls)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
def __getstate__(self):
return (self._parent_connection, self._child_connection)
def __setstate__(self, state):
self._parent_connection, self._child_connection = state
def keep_receiving(pipe):
for i in range(3):
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
def v2():
send_proc = Process(target=keep_sending, args=(MessagePipe.getInstance(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe.getInstance(), ))
rec_proc.start()
send_proc.join()
rec_proc.join()
if __name__ == '__main__':
v2()
打印:
Creating new instance.
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
from rec_proc: <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
{'text': 'test 3', 'exception': None, 'level': 20}
我认为跨不同进程实例化单例会产生不同的对象,因此单例只会在其自己的进程中是本地的。为了验证这一点,我写了一个测试:
def v1():
def keep_receiving():
while True:
print("from rec_proc: ", MessagePipe())
print(MessagePipe().recv())
def keep_sending():
for i in range(3):
print("from send_proc:", MessagePipe())
MessagePipe().send(text=f"test {i + 1}")
time.sleep(1)
send_proc = Process(target=keep_sending)
send_proc.start()
rec_proc = Process(target=keep_receiving)
rec_proc.start()
send_proc.join()
我希望这两个进程打印出不同内存地址的对象。但事实并非如此:
输出:
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f805e769610>
如您所见,地址相同。
问题:如果这是两个不同的实例,为什么地址相同,每个实例都在自己的进程中?
它必须是两个不同的实例,否则 print(MessagePipe().recv())
行应该打印出三个收到的消息。
当我像这样修改测试时,我得到了我期望的行为:
def v2():
def keep_receiving(pipe):
while True:
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
rec_proc.start()
send_proc.join()
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
from send_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
{'text': 'test 3', 'exception': None, 'level': 20}
from rec_proc: <image_service.service.utils.message_pipe.MessagePipe object at 0x7f51c2dff670>
MessagePipe
class的实现方式如下,但我认为实现不是问题的核心。
import logging
from multiprocessing import Pipe
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
在任何现代 OS 上,这些都是 虚拟内存 地址。这些地址是每个进程内存的本地地址 space。它们几乎从不对应于相同的物理内存单元格0x7f805e769610
。您需要 google“虚拟内存”
现代OS使用虚拟内存。这意味着物理内存页被映射到进程地址space。因此,加载程序可以为同一程序的多个实例提供相同的虚拟地址。因此在每个进程中,单例将具有相同的虚拟地址,但是由于这些虚拟地址映射到不同的物理页面,它们将指向不同的对象。
有人可能能够更详细地解释这一点,但我认为问题是——我稍后会解释这个问题——你必须首先问参数是如何从您的主要流程到您的新子流程。好吧,这似乎取决于您 运行 在哪个平台下,您忽略了指定但我可以推断是一个使用 fork 来创建新进程的平台。我相信因为你的平台支持分叉,因此从主进程继承了地址space(read/only,写时复制),不需要pickle
等特殊机制serialize/deserialize 参数和 那 就是地址相同的原因。
但问题在于:在 Windows 下,它不支持 fork 并将依赖 pickle
来传播 Process
参数, Message.__new__
方法将在每个新创建的进程中重新执行。单例不仅会有自己唯一的地址(除非奇迹发生),它们还会分配自己的 multiprocessing.Pipe
个实例,然后,当然,应用程序将完全无法工作,如下所示。你最好只将 multiprocessing.Connection
对象传递给你的子进程。
from multiprocessing import Process, Pipe
import logging
import time
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
print('__new__ is executing')
cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
def keep_receiving(pipe):
for i in range(3):
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
def v2():
send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
rec_proc.start()
send_proc.join()
rec_proc.join()
if __name__ == '__main__':
v2()
打印:
__new__ is executing
__new__ is executing
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
from rec_proc: <__mp_main__.MessagePipe object at 0x00000249497A05E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000283480705E0>
rec_proc
挂起,因为它正在从一个没有写入的连接中读取数据。
通过为您的 MessagePipe
class 指定 pickle
__setstate__
和 __getstate__
方法可以稍微缓解此问题。这不会阻止您的 __new__
方法被调用并在子进程中创建新的 Pipe
实例,但它会用传递的序列化参数替换它们。
from multiprocessing import Process, Pipe
import logging
import time
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
print('__new__ is executing')
cls._instance = super(MessagePipe, cls).__new__(cls, *args, **kwargs)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
def __getstate__(self):
return (self._parent_connection, self._child_connection)
def __setstate__(self, state):
self._parent_connection, self._child_connection = state
def keep_receiving(pipe):
for i in range(3):
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
def v2():
send_proc = Process(target=keep_sending, args=(MessagePipe(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe(), ))
rec_proc.start()
send_proc.join()
rec_proc.join()
if __name__ == '__main__':
v2()
打印:
__new__ is executing
__new__ is executing
__new__ is executing
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
from rec_proc: <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x000001BB2F5C15E0>
from send_proc: <__mp_main__.MessagePipe object at 0x00000220220F15E0>
{'text': 'test 3', 'exception': None, 'level': 20}
不使用方法 __new__
的 单例 模式的另一个更 classic 实现避免创建新的 Pipe
实例及其相应的连接仅在 piclke
__setstate__
方法恢复原始序列化连接时丢弃它们:
from multiprocessing import Process, Pipe
import logging
import time
class MessagePipe:
"""This implements a multiprocessing message pipe as a singleton and is intended for passing event information
such as log messaged and exceptions between processes.
"""
_instance = None
_parent_connection = None
_child_connection = None
def __init__(self):
raise RuntimeError('Call getInstance() instead')
@classmethod
def getInstance(cls):
if cls._instance is None:
print('Creating new instance.')
cls._instance = cls.__new__(cls)
cls._parent_connection, cls._child_connection = Pipe()
return cls._instance
@property
def child_connection(self):
return self._child_connection
@property
def parent_connection(self):
return self._parent_connection
def send(self, text="", exception=None, level=logging.INFO):
message = {
"text": text,
"exception": exception,
"level": level
}
self.child_connection.send(message)
def recv(self):
return self.parent_connection.recv()
def __getstate__(self):
return (self._parent_connection, self._child_connection)
def __setstate__(self, state):
self._parent_connection, self._child_connection = state
def keep_receiving(pipe):
for i in range(3):
print("from rec_proc: ", pipe)
print(pipe.recv())
def keep_sending(pipe):
for i in range(3):
print("from send_proc:", pipe)
pipe.send(text=f"test {i + 1}")
time.sleep(1)
def v2():
send_proc = Process(target=keep_sending, args=(MessagePipe.getInstance(), ))
send_proc.start()
rec_proc = Process(target=keep_receiving, args=(MessagePipe.getInstance(), ))
rec_proc.start()
send_proc.join()
rec_proc.join()
if __name__ == '__main__':
v2()
打印:
Creating new instance.
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
from rec_proc: <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
{'text': 'test 1', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
{'text': 'test 2', 'exception': None, 'level': 20}
from rec_proc: <__mp_main__.MessagePipe object at 0x0000022F2D8A1520>
from send_proc: <__mp_main__.MessagePipe object at 0x0000023459610520>
{'text': 'test 3', 'exception': None, 'level': 20}