如何通过 SyncManager 远程访问 multiprocessing.Manager().Namespace 元素?

How to access multiprocessing.Manager().Namespace element remotely via SyncManager?

无论我多么努力地阅读文档,我都无法理解如何使用 SyncManager 远程访问多处理命名空间的项目。

请注意,下面的代码是最短的代码,用于演示哪些有效,哪些无效。我的问题的症结在于对 register 的调用,我未能正确理解如何使用它。

下面的示例由两部分组成(都在同一个文件中mp-example.py):服务器和客户端。为了让客户端完成它的工作,服务器部分必须 运行ning(在另一个 terminal/screen 中)。

首先,有效的代码:

#!/usr/bin/env python3

import os
import sys
import multiprocessing as mp
import multiprocessing.managers as mpm
import queue
import socket
import contextlib
from contextlib import contextmanager

def get_ip():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
        s.connect(('192.168.17.1', 80))
        return s.getsockname()[0]

def get_free_port():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

def startserver(name):
    # Find a free port for the SyncManager
    newportnumber = get_free_port()
    Server = {'address': (get_ip(), newportnumber), 'authkey': b'mptest'}

    # Save data to file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'wt', encoding="utf-8") as serverdata_file:
        print(Server, file=serverdata_file)
        print(Server)

    # On the server, we do not need our own IP address
    Server['address'] = ('', newportnumber)

    # Set up server:
    #   define a multiprocessing manager
    #   define a multipprocessing namespace
    #   define a Value item in the namespace
    #   define a Value item outside the namespace
    #   define a multiprocessing syncmanager
    #   start the syncmanager
    #   register both the namespace and the value outside the namespace
    #   set values
    #   initialise a different process
    #   print the values
    #   run the other process & wait for completion
    #   print the values
    #   read standard input & print the values (waiting for a remote process connection)

    m = mp.Manager()
    m.name = 'mp.Manager: ' + name

    with m:
        ns = m.Namespace()
        ns.name = 'm.Namespace: ' + name
        ns.yv = m.Value(int, 0)
        yv = m.Value(int, 0)

        bm = mpm.SyncManager(**Server)
        bm.name = 'mpm.SyncManager: ' + name

        #bm.register('ns', callable=lambda: ns, proxytype=mpm.NamespaceProxy,
        #            exposed=('__getattribute__', '__setattr__', '__delattr__', 'yv')
        #           )
        # The above has not made any difference
        bm.register('ns', callable=lambda: ns, proxytype=mpm.NamespaceProxy)
        bm.register('yv', callable=lambda: yv, proxytype=mpm.ValueProxy)

        with bm:
            yv.value = 99
            ns.yv = 999

            def f(n, y):
                n.yv += 222
                y.value += 33

            proc = mp.Process(target=f, args=(ns, yv))
            print(f'yv={yv}, ns.yv={ns.yv}')
            proc.start()
            proc.join()
            proc.terminate()
            print(f'yv={yv}, ns.yv={ns.yv}, ns={ns}')
            print('Waiting for input (waiting for client), press CTRL-D to end')
            for line in sys.stdin:
                print(f'yv={yv}, ns.yv={ns.yv}, ns={ns}')
                print('Waiting for input (waiting for client), press CTRL-D to end')

elif sys.argv[1] == 'server' and len(sys.argv) == 3:
    startserver(sys.argv[2])
elif sys.argv[1] == 'client' and len(sys.argv) == 3:
    startclient(sys.argv[2])

上面的代码在 运行 单独执行时符合我的预期:

> python3 mp-example.py server test
{'address': ('192.168.17.10', 50793), 'authkey': b'mptest'}
yv=Value(<class 'int'>, 99), ns.yv=999
yv=Value(<class 'int'>, 132), ns.yv=1221, ns=Namespace(name='m.Namespace: test', yv=1221)
Waiting for input (waiting for client), press CTRL-D to end

接下来,不起作用的位:

下面的代码在一定程度上起作用,但在尝试访问命名空间中的值项 yv 时失败。

代码:

def startclient(name):
    # retrieve server identification from file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'rt', encoding="utf-8") as serverdata_file:
        Server = eval(serverdata_file.read())
        print(Server)

    bm = mpm.SyncManager(**Server)
    bm.register('yv', proxytype=mpm.ValueProxy)
    bm.register('ns', proxytype=mpm.NamespaceProxy)
    bm.connect()

    yv = bm.yv()
    print(f'yv={yv.value}')
    yv.value += 55
    print(f'yv={yv.value}')

    ns = bm.ns()
    print(f'ns={ns}')
    print(f'ns.yv={ns.yv}')
    ns.yv += 666
    print(f'ns.yv={ns.yv}')

输出:

> python3 mp-example.py client test
{'address': ('192.168.17.10', 50793), 'authkey': b'mptest'}
yv=627
yv=682
['_Client', '__class__', '__deepcopy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_address_to_local', '_after_fork', '_authkey', '_callmethod', '_close', '_connect', '_decref', '_exposed_', '_getvalue', '_id', '_idset', '_incref', '_manager', '_mutex', '_owned_by_manager', '_serializer', '_tls', '_token']
Help on method _getvalue in module multiprocessing.managers:

_getvalue() method of multiprocessing.managers.NamespaceProxy instance
    Get a copy of the value of the referent

None
ns=<NamespaceProxy object, typeid 'Namespace' at 0x7f5d05689a20>
Traceback (most recent call last):
  File "mp-example.py", line 333, in <module>
    startclient(sys.argv[2])
  File "mp-example.py", line 282, in startclient
    print(f'ns.yv={ns.yv}')
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 1060, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'NamespaceProxy' object has no attribute 'yv'

问题:

我需要做什么才能在客户端访问ns.yv

我也为此苦苦挣扎——很难按照您所遵循的思路想出任何既不复杂又低效的解决方案。顺便说一句,您正在尝试 return 共享内存值的代理,该值已经可以共享而无需代理,并且如果您的目标只是提供一个单例值,则不需要它在共享内存中。

一个更简单的方法,它不需要您创建两个管理器进程,只是为了创建一个代理对象,然后在您启动它之前向另一个管理器注册,就是定义您自己的自定义 类.您通常会创建自己的 multiprocessing.managers.BaseManager 子类,您将在其中注册新的托管 类(当然,如果您的客户还需要已经在 SyncManager 中注册的 类,例如 dict,那么您可以将其子类化)。

以下程序展示了如何创建单例 Namespace 等效类型。我把你的IP地址注释掉了,用127.0.0.1来测试。我还用更安全的替代方法替换了您对 eval 的调用。

#!/usr/bin/env python3

import os
import sys
import multiprocessing as mp
from multiprocessing.managers import BaseManager, NamespaceProxy
import socket
import contextlib
from ast import literal_eval
from threading import Thread


def get_ip():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
        #s.connect(('192.168.17.1', 80))
        s.connect(('127.0.0.1', 80))
        return s.getsockname()[0]

def get_free_port():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

class MyNamespaceManager(BaseManager):
    pass

class MyNamespace:
    pass

# Singleton:
singleton = MyNamespace()

def get_singleton():
    return singleton

def run_server(server):
    server.serve_forever()

def startserver():
    # Find a free port for the SyncManager
    newportnumber = get_free_port()
    Server = {'address': (get_ip(), newportnumber), 'authkey': b'mptest'}

    # Save data to file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'wt', encoding="utf-8") as serverdata_file:
        print(Server, file=serverdata_file)
        print(Server)

    # On the server, we do not need our own IP address
    Server['address'] = ('', newportnumber)

    # Singleton
    MyNamespaceManager.register('MyNamespace', callable=get_singleton, proxytype=NamespaceProxy)

    manager = MyNamespaceManager(**Server)
    """
    manager.start()
    input('Hit enter to end: ')
    manager.shutdown()
    """
    server = manager.get_server()
    Thread(target=run_server, args=(server,)).start()
    try:
        input('Hit enter to end: ')
    except KeyboardInterrupt:
        pass
    os.unlink(serverdata_filename)
    server.stop_event.set() # The server stops and calls sys.exit()

def startclient():
    # retrieve server identification from file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'rt', encoding="utf-8") as serverdata_file:
        Server = literal_eval(serverdata_file.read())
        print(Server)

    MyNamespaceManager.register('MyNamespace', proxytype=NamespaceProxy)
    manager = MyNamespaceManager(**Server)
    manager.connect()

    yv1 = manager.MyNamespace()
    yv1.value = 99
    yv1.value += 55
    yv1.x = 9

    yv2 = manager.MyNamespace()
    print(f'yv2.value={yv2.value}')
    print(f'yv2.x={yv2.x}')


if __name__ == '__main__':

    if len(sys.argv) == 2 and sys.argv[1] == 'server':
        startserver()
    else:
        startclient()

客户端打印:

{'address': ('127.0.0.1', 51211), 'authkey': b'mptest'}
yv2.value=154
yv2.x=9

编辑-综合解决方案:

我通过编写一个新的 RemoteSyncManager class 来封装 multiprocessing.managers.SyncManager() 实例来解决我的大部分挫折。

我的这个库确实简化了 SyncManager 本地和远程进程的使用。

前往我的 GitHub 存储库 PythonLibraries-Multiprocessing-RemoteSyncManager 了解详细信息。

我的第一个解决方案:

我终于设法设置了一个与远程连接同步的托管多处理命名空间。

但是这个设置拒绝使用 Managed dict,它的工作方式与上面 Booboo 的建议类似。

有什么方法可以在远程共享命名空间中定义和保持同步整个字典,而无需在每次操作时重新分配字典?

代码:

if __name__ == '__main__':
    if sys.argv[1] == 'Server' and len(sys.argv) == 2:
        #localmanager = mp.Manager()
        #with localmanager:
        q1 = mp.Queue(1)
        q2 = mp.Queue(1)
        localnamespace = mpm.Namespace()
    
        class syncmanager(mpm.SyncManager):
            pass
        syncmanager.register('get_q1', callable=lambda: q1)
        syncmanager.register('get_q2', callable=lambda: q2)
        syncmanager.register('get_ns', callable=lambda: localnamespace, proxytype=mpm.NamespaceProxy)
    
        remotemanager = syncmanager(address = ('', 56789), authkey = b'mypassword')
        remotemanager.start()
    
        with remotemanager:
            ns = remotemanager.get_ns()
            #ns.sampledict = remotemanager.dict()
            ns.sampledict = {}
            #ns.sampledict = localmanager.dict()
            ns.sampledict['testabc'] = 1234567
            ns.sampledict['TESTABC'] = 9876
            d = {}
            d['testabc'] = 1234567
            d['TESTABC'] = 9876
            ns.sampledict2 = d
    
            print(f'Server ns = {ns}')
    
            print(q2.get())
            q1.put(f'Server ns = {ns}')
    
            ns.sampledict['testabc'] = 1
            ns.sampleserverint = 98789
            print(f'Server ns = {ns}')
    
            print(q2.get())
            q1.put(f'Server ns = {ns}')
    
    elif sys.argv[1] == 'Client' and len(sys.argv) == 2:
        class syncmanager(mpm.SyncManager):
            pass
        syncmanager.register('get_q1')
        syncmanager.register('get_q2')
        syncmanager.register('get_ns', proxytype=mpm.NamespaceProxy)
    
        remotemanager = syncmanager(address = ('', 56789), authkey = b'mypassword')
        remotemanager.connect()
    
        q1 = remotemanager.get_q1()
        q2 = remotemanager.get_q2()
        ns = remotemanager.get_ns()
    
        q2.put('ready')
        msg = q1.get()
        print(msg)
        print(f'Client ns = {ns}')
    
        ns.sampledict['TESTABC'] = 9
        ns.sampleclientint = 7887
    
        q2.put('next.1')
        msg = q1.get()
        print(msg)
        print(f'Client ns = {ns}')