Python multiprocessing.managers.BaseManager 运行 按顺序注册可调用函数

Python multiprocessing.managers.BaseManager running registered callable function sequentially

我正在使用 Python 的多处理库提供的远程管理器。我已经使用 BaseManager 设置了一个远程服务器,多个客户端同时连接到该服务器。不幸的是,我的服务器按顺序为每个客户端提供服务请求。我的服务器应该对 Google 的指示 API 到 return 的距离和时间进行网络调用。

我的理解是每个连接的客户端都会生成一个新线程,所以我不会遇到这个问题。

我以简化的方式提供了我的代码示例。

服务器代码如下:

import time
from multiprocessing.managers import BaseManager
import threading

class DistanceTime:

    def get_distance_time(self):
        print('started by thread %s'%(threading.get_ident()))
        # assume that network request was made here
        time.sleep(2)
        print('ended by thread %s'%(threading.get_ident()))

def server():
    distance_time=DistanceTime()
    BaseManager.register('get_distance_time', callable=distance_time.get_distance_time)
    manager = BaseManager(address=('localhost', 5000), authkey=b'abracadabra')
    server = manager.get_server()
    print('server running')
    server.serve_forever()

server()

客户端代码如下:

from multiprocessing.managers import BaseManager
from concurrent.futures import ThreadPoolExecutor
import time

def client():
    BaseManager.register('get_distance_time')
    manager = BaseManager(address=('localhost', 5000), authkey=b'abracadabra')
    manager.connect()
    executor = ThreadPoolExecutor(max_workers=3)
    # client mades three simultaneous requests to the server
    b=executor.submit(manager.get_distance_time)
    b=executor.submit(manager.get_distance_time)
    c=executor.submit(manager.get_distance_time)
    print('done')
    time.sleep(5)

client()

即使客户端立即发送所有三个请求,服务器也会打印以下内容:

server running
started by thread 16740
ended by thread 16740
started by thread 4712
ended by thread 4712
started by thread 7132
ended by thread 7132

理想情况下,所有开始的印刷品应该放在一起。这是我的应用程序的主要瓶颈。

您正在 register 调用的是一个 'creation' 方法,这些是 always run in a locked context,但是它 returns 的对象是自动代理的,并且任何方法都被调用它不会自动锁定

在您的演示代码中,我会更改:

def server():
    distance_time=DistanceTime()
    BaseManager.register('get_distance_time', callable=distance_time.get_distance_time)

成为:

def server():
    distance_time = DistanceTime()
    BaseManager.register('DistanceTime', lambda: distance_time)

然后将其用作:

distance_time = manager.DistanceTime()
a = executor.submit(distance_time.get_distance_time)
b = executor.submit(distance_time.get_distance_time)
c = executor.submit(distance_time.get_distance_time)

这应该允许所有事情并行进行。我还没有真正测试过这个,但如果你说这不起作用...

这在这里并不重要,但我通常觉得最好在 separate/derived Manager

中注册这些东西