在进程之间共享异步对象

Sharing asyncio objects between processes

我正在使用 asynciomultiprocessing 库来 运行 两个进程,每个进程都有一个服务器实例在不同端口上侦听传入消息。

为了识别每个客户端,我想在两个进程之间共享一个 dict 以更新已知客户端列表。为此,我决定使用一个 Tuple[StreamReader, StreamWriter] 查找键,它为此连接分配了一个 Client 对象。

但是,一旦我插入或访问共享字典,程序就会崩溃并显示以下错误消息:

Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
  File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
    if not await self.handle_message(reader, writer, buffer):
  File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
    client = self.syncmanager.get_api_client((reader, writer))
  File "<string>", line 2, in get_api_client
  File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

自然是查了报错信息,发现了this question,但是我不太明白这里是什么原因。据我了解,这次崩溃的原因是 StreamReaderStreamWriter 不能 pickled/serialized 以便在进程之间共享。如果这确实是原因,是否有办法对它们进行 pickle,也许通过修补 reducer 函数来使用不同的 pickler?

您可能有兴趣改用 SyncManager。只需确保在最后调用 shutdown 关闭管理器,这样就不会留下任何僵尸进程。

from multiprocessing.managers import SyncManager
from multiprocessing import Process
import signal

my_manager = SyncManager()

# to avoid closing the manager by ctrl+C. be sure to handle KeyboardInterrupt errors and close the manager accordingly
def manager_init():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

my_manager.start(manager_init)

my_dict = my_manager.dict()
my_dict["clients"] = my_manager.list()
def my_process(my_id, the_dict):
    for i in range(3):
        the_dict["clients"].append(f"{my_id}_{i}")

processes = []
for j in range(4):
    processes.append(Process(target=my_process, args=(j,my_dict)))

for p in processes:
    p.start()

for p in processes:
    p.join()

print(my_dict["clients"])
# ['0_0', '2_0', '0_1', '3_0', '1_0', '0_2', '1_1', '2_1', '3_1', '1_2', '2_2', '3_2']

my_manager.shutdown()



我设法找到了解决方法,同时还保留了 asynciomultiprocessing 库,没有任何其他库。

首先,由于 StreamReaderStreamWriter 对象不可拾取,我不得不使用 socket。这很容易通过一个简单的函数实现:

def get_socket(writer: StreamWriter):
    fileno = writer.get_extra_info('socket').fileno()
    return socket.fromfd(fileno, AddressFamily.AF_INET, socket.SOCK_STREAM)

套接字被插入到共享对象中(例如 Manager().dict() 甚至自定义 class,您必须通过自定义 BaseManager 实例进行注册)。现在,由于应用程序构建在 asyncio 上并使用库提供的流,我们可以轻松地将 socket 转换回一对 StreamReaderStreamWriter通过:

node_reader, node_writer = await asyncio.open_connection(sock=self.node_sock)
node_writer.write(mesg_text)
await node_writer.drain()

其中 self.node_sock 是通过共享对象传递的 socket 实例。