多处理代理:让 getters return 代理自己

Multiprocessing proxy: let getters return proxies themselves

我有一个复杂的不可拾取对象,它具有复杂且不可拾取类型的属性(通过 getters 和设置器定义)。我想为对象创建一个多处理代理来并行执行一些任务。

问题:虽然我已经成功地使 getter 方法可用于代理对象,但我未能使 getters return 不可拾取的 return 对象的代理。

我的设置类似于以下内容:

from multiprocessing.managers import BaseManager, NamespaceProxy

class A():
    @property
    def a(self):
        return B()
    @property
    def b(self):
        return 2

# unpickable class
class B():
    def __init__(self, *args):
        self.f = lambda: 1
    

class ProxyBase(NamespaceProxy):
    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')

class AProxy(ProxyBase): pass
class BProxy(ProxyBase): pass
class MyManager(BaseManager):pass

MyManager.register('A', A, AProxy)

if __name__ == '__main__':
    with MyManager() as manager:
        myA = manager.A()
        print(myA.b) # works great
        print(myA.a) # raises error, because the object B is not pickable

我知道我可以在向管理器注册方法时指定方法的结果类型。也就是说,我可以做到

MyManager.register('A', A, AProxy, method_to_typeid={'__getattribute__':'B'})
MyManager.register('B', B, BProxy)


if __name__ == '__main__':
    with MyManager() as manager:
        myA = manager.A()
        print(myA.a) # works great!
        print(myA.b) # returns the same as myA.a ?!

我很清楚我的解决方案不起作用,因为 __getattr__ 方法适用于所有属性,而我只希望它 return 作为 B 的代理 属性 a 被访问。我怎样才能做到这一点?

作为附带问题:如果我从 B__init__ 方法中删除 *args 参数,我会得到一个错误,它是用错误的参数数量调用的。为什么?我该如何解决这个问题?

我不认为这没有一些技巧是不可能的,因为选择 return 一个值或代理是单独基于方法名称,而不是 return 值的类型(来自 Server.serve_client):

try:
    res = function(*args, **kwds)
except Exception as e:
    msg = ('#ERROR', e)
else:
    typeid = gettypeid and gettypeid.get(methodname, None)
    if typeid:
        rident, rexposed = self.create(conn, typeid, res)
        token = Token(typeid, self.address, rident)
        msg = ('#PROXY', (rexposed, token))
    else:
        msg = ('#RETURN', res)

另外请记住,在不可选取的 class 代理中公开 __getattribute__ 基本上会在调用方法时破坏代理功能。

但是如果你愿意破解它并且只需要属性访问,这是一个可行的解决方案(注意调用 myA.a.f() 仍然不起作用,lambda 是一个属性并且没有被代理,只有方法是,但这是一个不同的问题)。

import os
from multiprocessing.managers import BaseManager, NamespaceProxy, Server

class A():
    @property
    def a(self):
        return B()
    @property
    def b(self):
        return 2

# unpickable class
class B():
    def __init__(self, *args):
        self.f = lambda: 1
        self.pid = os.getpid()

class HackedObj:
    def __init__(self, obj, gettypeid):
        self.obj = obj
        self.gettypeid = gettypeid

    def __getattribute__(self, attr):
        if attr == '__getattribute__':
            return object.__getattribute__(self, attr)
            
        obj = object.__getattribute__(self, 'obj')
        result = object.__getattribute__(obj, attr)
        if isinstance(result, B):
            gettypeid = object.__getattribute__(self, 'gettypeid')
            # This tells the server that the return value of this method is
            # B, for which we've registered a proxy.
            gettypeid['__getattribute__'] = 'B'


        return result

class HackedDict:
    def __init__(self, data):
        self.data = data

    def __setitem__(self, key, value):
        self.data[key] = value

    def __getitem__(self, key):
        obj, exposed, gettypeid = self.data[key]
        if isinstance(obj, A):
            gettypeid = gettypeid.copy() if gettypeid else {}
            # Now we need getattr to update gettypeid based on the result
            # luckily BaseManager queries the typeid info after the function
            # has been invoked
            obj = HackedObj(obj, gettypeid)

        return (obj, exposed, gettypeid)

class HackedServer(Server):
    def __init__(self, registry, address, authkey, serializer):
        super().__init__(registry, address, authkey, serializer)
        self.id_to_obj = HackedDict(self.id_to_obj)

class MyManager(BaseManager):
    _Server = HackedServer

class ProxyBase(NamespaceProxy):
    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
class AProxy(ProxyBase): pass
class BProxy(ProxyBase): pass

MyManager.register('A', callable=A, proxytype=AProxy)
MyManager.register('B', callable=B, proxytype=BProxy)

if __name__ == '__main__':
    print("This process: ", os.getpid())
    with MyManager() as manager:
        myB = manager.B()
        print("Proxy process, using B directly: ", myB.pid)
        
        myA = manager.A()
        print('myA.b', myA.b)
        
        print("Proxy process, via A: ", myA.a.pid)

解决方案的关键是替换我们管理器中的 _Server,然后将 id_to_obj dict 包装为执行我们需要的特定方法的 hack 的字典。

hack 包括为方法填充 gettypeid 字典,但只有在对它进行评估并且我们知道 return 类型是我们需要代理的类型之后。我们很幸运,在评估顺序上,gettypeid 在方法被调用后访问。

还幸运的是 gettypeidserve_client 方法中用作本地,所以我们可以 return 它的副本并修改它,我们不会引入任何并发问题.

虽然这是一个有趣的练习,但我不得不说我真的反对这个解决方案,如果你正在处理你无法修改的外部代码,你应该简单地创建你自己的包装器 class显式方法而不是 @property 访问器,代理你自己的 class,并使用 method_to_typeid.