如何在单独的进程中启动具有多个队列的多处理网络管理器

How to start multiprocessing network manager with multiple queues in separate process

我是 Python 的新手。是否可以在单独的进程中通过网络 运行 多队列管理器?

我正在努力写,这是目前发生的事情:

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
import time

address = "127.0.0.1"
password = "secret"

if __name__ == '__main__':
    with Manager() as manager:
        work_tasks_queue = manager.Queue()
        done_task_queue = manager.Queue()

        class QueueManager(BaseManager):
            pass


        QueueManager.register('work_tasks_queue', callable=lambda: work_tasks_queue)
        QueueManager.register('done_task_queue', callable=lambda: done_task_queue)

        net_manager = QueueManager(address=(address, 50000), authkey=password.encode('utf-8'))

        net_manager.start()

        print("------------------------------------------------------")

此刻,我设法启动了带有多个队列的网络管理器。

我想在单独的进程中启动服务器。我希望能够在一个脚本中处理两个队列(添加新作业以处理或标记已完成的任务)。

看来我可以做到。这是一个例子。我也是这样做的。

https://github.com/shariq/burgundy/blob/master/remotequeue.py

但在我的例子中,我得到了错误:

Traceback (most recent call last):
  File "D:\WORKSPACE\check_manager_in_event\main.py", line 47, in <module>
    net_manager.start()
  File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\managers.py", line 553, in start
    self._process.start()
  File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x0000020B330551F0>: attribute lookup <lambda> on __main__ failed

Process finished with exit code 1

我在 Pycharm 社区 IDE.IDE 的 Windows 10 Professional 上使用 Python 3.9

有人可以帮忙解决这个问题吗?

有一些微妙之处。首先,当您向管理器注册一个 returns 对象的函数时,默认情况下管理器将尝试为该对象构建代理。但是您返回的对象是一个托管队列, 已经是 一个代理对象。因此,您应该只返回一个 ordinary 队列实例,如 second example in Using a Remote Manager.

下面的代码可以用参数 server 来启动远程管理器,workers 来启动一个进程池 3每个进程从 work_tasks_queue 中读取期望整数并将元组写入 task_done_queue 队列的过程,该队列由整数和整数的平方组成,作为结果或不带参数启动写入的客户端10 个整数到 work_tasks_queue 队列,然后从 task_done_queue 队列中读取 10 个结果,这可能是任意顺序。

身份验证似乎存在错误,进程池中的每个进程都必须按如下方式初始化自己的进程,否则管理器将拒绝请求:

current_process().authkey = password.encode('utf-8')

不用说,服务器、工作人员和客户端通常(或至少可能)在 3 台不同的机器上 运行(对 address 规范进行了调整)。

通用QueueManager.py模块

from multiprocessing.managers import BaseManager

address = "127.0.0.1"
port = 50000
password = "secret"

class QueueManager(BaseManager):
    pass

def connect_to_manager():
    QueueManager.register('work_tasks_queue')
    QueueManager.register('done_task_queue')
    manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
    manager.connect()
    return manager.work_tasks_queue(), manager.done_task_queue()

server.py

from QueueManager import *
from queue import Queue

work_tasks_queue = Queue()
done_task_queue = Queue()

def get_work_tasks_queue():
    return work_tasks_queue

def get_done_task_queue():
    return done_task_queue

def server():
    # Don't seem to be able to use a lambda or nested function when using net_manager.start():
    QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
    QueueManager.register('done_task_queue', callable=get_done_task_queue)

    net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))

    net_manager.start()
    input('Server starting. Hit Enter to terminate....')
    net_manager.shutdown()

if __name__ == '__main__':
    server()

workers.py

from QueueManager import *
from multiprocessing import Process, current_process, cpu_count
from threading import Thread

def worker(in_q, out_q):
    current_process().authkey = password.encode('utf-8')
    while True:
        x = in_q.get()
        if x is None: # signal to terminate
            in_q.task_done()
            break
        out_q.put((x, x ** 2))
        in_q.task_done()

def create_workers(in_q, out_q, n_workers):
    processes = [Process(target=worker, args=(in_q, out_q)) for _ in range(n_workers)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()

def start_workers():
    N_WORKERS = cpu_count()
    in_q, out_q = connect_to_manager()
    t = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS))
    t.start()
    input('Starting workers. Hit enter to terminate...')
    for _ in range(N_WORKERS):
        in_q.put(None) # tell worker to quit
    #in_q.join() # not strictly necessary; assumes client's work has been completed too
    t.join()


if __name__ == '__main__':
    start_workers()

client.py

from QueueManager import *

def client():
    in_q, out_q = connect_to_manager()
    for x in range(1, 10):
        in_q.put(x)
    # get results as they become available:
    for x in range(1, 10):
        x, result = out_q.get()
        print(x, result)


if __name__ == '__main__':
    client()

打印:

1 1
4 16
3 9
2 4
5 25
6 36
8 64
7 49
9 81

更新

这是 运行 所有内容的代码。

from QueueManager import *
from workers import create_workers
from client import client
from queue import Queue
from threading import Thread, Event

# So that queues are not unnecessarily created by worker processes under Windows:
work_tasks_queue = None
done_task_queue = None

def get_work_tasks_queue():
    global work_tasks_queue
    # singleton:
    if work_tasks_queue is None:
        work_tasks_queue = Queue()
    return work_tasks_queue

def get_done_task_queue():
    global done_task_queue
    # singleton:
    if done_task_queue is None:
        done_task_queue = Queue()
    return done_task_queue

def server(started_event, shutdown_event):
    # Don't seem to be able to use a lambda or nested function when using net_manager.start():
    QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
    QueueManager.register('done_task_queue', callable=get_done_task_queue)

    net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))

    net_manager.start()
    started_event.set() # tell main thread that we have started
    shutdown_event.wait() # wait to be told to shutdown
    net_manager.shutdown()


if __name__ == '__main__':
    started_event = Event()
    shutdown_event = Event()
    server_thread = Thread(target=server, args=(started_event, shutdown_event,))
    server_thread.start()
    # wait for manager to start:
    started_event.wait()
    in_q, out_q = connect_to_manager()
    N_WORKERS = 3
    workers_thread = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS,))
    workers_thread.start()
    client()
    # tell workers we are through:
    for _ in range(N_WORKERS):
        in_q.put(None)
    #in_q.join() # not strictly necessary; assumes client's work has been completed too
    workers_thread.join()
    # tell manager we are through:
    shutdown_event.set()
    server_thread.join()