我可以在 ListProxy 中附加 EventProxy 吗?
Can I append EventProxy in ListProxy?
我需要在分布式机器上附加远程事件,进程很多。所以我尝试在 ListProxy 中附加 EventProxy 以进行分布式多处理。
代码和错误如下所示:
服务器旨在提供列表中的顶级服务。
Client1 的目标是在ListProxy 中附加Event 并设置第一个Event。
Client2目的是从ListProxy获取Event来判断是否设置
服务器:
from multiprocessing.managers import ListProxy,EventProxy
import torch.multiprocessing as mp
def server(host, port, key, manager):
msg_buffer = manager.list()
manager.register('get_list', callable=lambda :msg_buffer, proxytype=ListProxy)
manager.__init__(address=(host, port), authkey=key)
print('connect to server %s' % host)
s = manager.get_server()
s.serve_forever()
if __name__ == '__main__':
manager = mp.Manager()
server('127.0.0.1', 5000, b'abc', manager)
客户 1:
import torch.multiprocessing as mp
def client1(host, port, key):
manager = mp.Manager()
manager.register('get_list')
manager.__init__(address=(host, port), authkey=key)
manager.connect()
return manager
def set_list(manager):
l = manager.get_list()
print(l)
for i in range(3):
l.append(manager.Event())
if __name__ == '__main__':
manager = client1('127.0.0.1', 5000, b'abc')
set_list(manager)
客户 2:
import torch.multiprocessing as mp
def client1(manager, host, port, key):
manager.register('get_list')
manager.__init__(address=(host, port), authkey=key)
manager.connect()
def set_list():
l = manager.get_list()
if l[0].is_set():
print('set')
else:
l[0].set()
if __name__ == '__main__':
manager = mp.Manager()
client1(manager, '127.0.0.1', 5000, b'abc')
set_list()
但是出现如下问题:
Traceback (most recent call last):
File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 38, in <module>
server('127.0.0.1', 5000, b'abc', manager)
File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 25, in server
event()
File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 32, in event
l.append(manager.Event())
File "<string>", line 2, in append
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 228, in serve_client
request = recv()
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 881, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 731, in __init__
self._incref()
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 785, in _incref
conn = self._Client(self._token.address, authkey=self._authkey)
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 493, in Client
answer_challenge(c, authkey)
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 739, in answer_challenge
raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected
---------------------------------------------------------------------------
有什么建议可以在 ListProxy 中附加 EventProxy 对象吗?
问题解决了!
通过遵循 similar problem in Python 2.7 ,我们知道当 python pickle 在进程初始化后使用 multiprocessing 代理时,unpickle 后没有可用的 authkey。所以我们面临 AuthenticationError.
一个简单的修复方法:
import torch.multiprocessing as mp
mp.current_process().authkey = b'abc'
只需在您的导入项下添加 mp.current_process().authkey = b'abc'
。
我需要在分布式机器上附加远程事件,进程很多。所以我尝试在 ListProxy 中附加 EventProxy 以进行分布式多处理。 代码和错误如下所示: 服务器旨在提供列表中的顶级服务。
Client1 的目标是在ListProxy 中附加Event 并设置第一个Event。
Client2目的是从ListProxy获取Event来判断是否设置
服务器:
from multiprocessing.managers import ListProxy,EventProxy
import torch.multiprocessing as mp
def server(host, port, key, manager):
msg_buffer = manager.list()
manager.register('get_list', callable=lambda :msg_buffer, proxytype=ListProxy)
manager.__init__(address=(host, port), authkey=key)
print('connect to server %s' % host)
s = manager.get_server()
s.serve_forever()
if __name__ == '__main__':
manager = mp.Manager()
server('127.0.0.1', 5000, b'abc', manager)
客户 1:
import torch.multiprocessing as mp
def client1(host, port, key):
manager = mp.Manager()
manager.register('get_list')
manager.__init__(address=(host, port), authkey=key)
manager.connect()
return manager
def set_list(manager):
l = manager.get_list()
print(l)
for i in range(3):
l.append(manager.Event())
if __name__ == '__main__':
manager = client1('127.0.0.1', 5000, b'abc')
set_list(manager)
客户 2:
import torch.multiprocessing as mp
def client1(manager, host, port, key):
manager.register('get_list')
manager.__init__(address=(host, port), authkey=key)
manager.connect()
def set_list():
l = manager.get_list()
if l[0].is_set():
print('set')
else:
l[0].set()
if __name__ == '__main__':
manager = mp.Manager()
client1(manager, '127.0.0.1', 5000, b'abc')
set_list()
但是出现如下问题:
Traceback (most recent call last):
File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 38, in <module>
server('127.0.0.1', 5000, b'abc', manager)
File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 25, in server
event()
File "/home/drsun/文档/RL/distprocess_prototype/server.py", line 32, in event
l.append(manager.Event())
File "<string>", line 2, in append
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 228, in serve_client
request = recv()
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 881, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 731, in __init__
self._incref()
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/managers.py", line 785, in _incref
conn = self._Client(self._token.address, authkey=self._authkey)
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 493, in Client
answer_challenge(c, authkey)
File "/home/drsun/anaconda3/envs/AC/lib/python3.6/multiprocessing/connection.py", line 739, in answer_challenge
raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected
---------------------------------------------------------------------------
有什么建议可以在 ListProxy 中附加 EventProxy 对象吗?
问题解决了! 通过遵循 similar problem in Python 2.7 ,我们知道当 python pickle 在进程初始化后使用 multiprocessing 代理时,unpickle 后没有可用的 authkey。所以我们面临 AuthenticationError.
一个简单的修复方法:
import torch.multiprocessing as mp
mp.current_process().authkey = b'abc'
只需在您的导入项下添加 mp.current_process().authkey = b'abc'
。