我可以在 ListProxy 中附加 EventProxy 吗?

Can I append EventProxy in ListProxy?

我需要在分布式机器上附加远程事件,进程很多。所以我尝试在 ListProxy 中附加 EventProxy 以进行分布式多处理。 代码和错误如下所示: 服务器旨在提供列表中的顶级服务。

Client1 的目标是在ListProxy 中附加Event 并设置第一个Event。

Client2目的是从ListProxy获取Event来判断是否设置

服务器:

from multiprocessing.managers import ListProxy,EventProxy
import torch.multiprocessing as mp

def server(host, port, key, manager):
    msg_buffer = manager.list()
    manager.register('get_list', callable=lambda :msg_buffer, proxytype=ListProxy)
    manager.__init__(address=(host, port), authkey=key)
    print('connect to server %s' % host)
    s = manager.get_server()
    s.serve_forever()

if __name__ == '__main__':
    manager = mp.Manager()
    server('127.0.0.1', 5000, b'abc', manager)

客户 1:

import torch.multiprocessing as mp

def client1(host, port, key):
    manager = mp.Manager()
    manager.register('get_list')
    manager.__init__(address=(host, port), authkey=key)
    manager.connect()
    return manager

def set_list(manager):
    l = manager.get_list()
    print(l)
    for i in range(3):
        l.append(manager.Event())
    
if __name__ == '__main__':
    manager = client1('127.0.0.1', 5000, b'abc')
    set_list(manager)

客户 2:

import torch.multiprocessing as mp


def client1(manager, host, port, key):
    manager.register('get_list')
    manager.__init__(address=(host, port), authkey=key)
    manager.connect()

def set_list():
    l = manager.get_list()
    if l[0].is_set():
        print('set')
    else:
        l[0].set()

if __name__ == '__main__':
    manager = mp.Manager()
    client1(manager, '127.0.0.1', 5000, b'abc')
    set_list()

但是出现如下问题:

Traceback (most recent call last):
  File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 38, in <module>
    server('127.0.0.1', 5000, b'abc', manager)
  File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 25, in server
    event()
  File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 32, in event
    l.append(manager.Event())
  File "<string>", line 2, in append
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 228, in serve_client
    request = recv()
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 881, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 731, in __init__
    self._incref()
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 785, in _incref
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 493, in Client
    answer_challenge(c, authkey)
  File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 739, in answer_challenge
    raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected
---------------------------------------------------------------------------

有什么建议可以在 ListProxy 中附加 EventProxy 对象吗?

问题解决了! 通过遵循 similar problem in Python 2.7 ,我们知道当 python pickle 在进程初始化后使用 multiprocessing 代理时,unpickle 后没有可用的 authkey。所以我们面临 AuthenticationError.

一个简单的修复方法:

import torch.multiprocessing as mp
mp.current_process().authkey = b'abc'

只需在您的导入项下添加 mp.current_process().authkey = b'abc'