如何在单独的进程中启动具有多个队列的多处理网络管理器
How to start multiprocessing network manager with multiple queues in separate process
我是 Python 的新手。是否可以在单独的进程中通过网络 运行 多队列管理器?
我正在努力写,这是目前发生的事情:
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
import time
address = "127.0.0.1"
password = "secret"
if __name__ == '__main__':
with Manager() as manager:
work_tasks_queue = manager.Queue()
done_task_queue = manager.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('work_tasks_queue', callable=lambda: work_tasks_queue)
QueueManager.register('done_task_queue', callable=lambda: done_task_queue)
net_manager = QueueManager(address=(address, 50000), authkey=password.encode('utf-8'))
net_manager.start()
print("------------------------------------------------------")
此刻,我设法启动了带有多个队列的网络管理器。
我想在单独的进程中启动服务器。我希望能够在一个脚本中处理两个队列(添加新作业以处理或标记已完成的任务)。
看来我可以做到。这是一个例子。我也是这样做的。
https://github.com/shariq/burgundy/blob/master/remotequeue.py
但在我的例子中,我得到了错误:
Traceback (most recent call last):
File "D:\WORKSPACE\check_manager_in_event\main.py", line 47, in <module>
net_manager.start()
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\managers.py", line 553, in start
self._process.start()
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\process.py", line 121, in start
self._popen = self._Popen(self)
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\context.py", line 327, in _Popen
return Popen(process_obj)
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x0000020B330551F0>: attribute lookup <lambda> on __main__ failed
Process finished with exit code 1
我在 Pycharm 社区 IDE.IDE 的 Windows 10 Professional 上使用 Python 3.9
有人可以帮忙解决这个问题吗?
有一些微妙之处。首先,当您向管理器注册一个 returns 对象的函数时,默认情况下管理器将尝试为该对象构建代理。但是您返回的对象是一个托管队列, 已经是 一个代理对象。因此,您应该只返回一个 ordinary 队列实例,如 second example in Using a Remote Manager.
下面的代码可以用参数 server 来启动远程管理器,workers 来启动一个进程池 3每个进程从 work_tasks_queue
中读取期望整数并将元组写入 task_done_queue
队列的过程,该队列由整数和整数的平方组成,作为结果或不带参数启动写入的客户端10 个整数到 work_tasks_queue
队列,然后从 task_done_queue
队列中读取 10 个结果,这可能是任意顺序。
身份验证似乎存在错误,进程池中的每个进程都必须按如下方式初始化自己的进程,否则管理器将拒绝请求:
current_process().authkey = password.encode('utf-8')
不用说,服务器、工作人员和客户端通常(或至少可能)在 3 台不同的机器上 运行(对 address
规范进行了调整)。
通用QueueManager.py模块
from multiprocessing.managers import BaseManager
address = "127.0.0.1"
port = 50000
password = "secret"
class QueueManager(BaseManager):
pass
def connect_to_manager():
QueueManager.register('work_tasks_queue')
QueueManager.register('done_task_queue')
manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
manager.connect()
return manager.work_tasks_queue(), manager.done_task_queue()
server.py
from QueueManager import *
from queue import Queue
work_tasks_queue = Queue()
done_task_queue = Queue()
def get_work_tasks_queue():
return work_tasks_queue
def get_done_task_queue():
return done_task_queue
def server():
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
input('Server starting. Hit Enter to terminate....')
net_manager.shutdown()
if __name__ == '__main__':
server()
workers.py
from QueueManager import *
from multiprocessing import Process, current_process, cpu_count
from threading import Thread
def worker(in_q, out_q):
current_process().authkey = password.encode('utf-8')
while True:
x = in_q.get()
if x is None: # signal to terminate
in_q.task_done()
break
out_q.put((x, x ** 2))
in_q.task_done()
def create_workers(in_q, out_q, n_workers):
processes = [Process(target=worker, args=(in_q, out_q)) for _ in range(n_workers)]
for process in processes:
process.start()
for process in processes:
process.join()
def start_workers():
N_WORKERS = cpu_count()
in_q, out_q = connect_to_manager()
t = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS))
t.start()
input('Starting workers. Hit enter to terminate...')
for _ in range(N_WORKERS):
in_q.put(None) # tell worker to quit
#in_q.join() # not strictly necessary; assumes client's work has been completed too
t.join()
if __name__ == '__main__':
start_workers()
client.py
from QueueManager import *
def client():
in_q, out_q = connect_to_manager()
for x in range(1, 10):
in_q.put(x)
# get results as they become available:
for x in range(1, 10):
x, result = out_q.get()
print(x, result)
if __name__ == '__main__':
client()
打印:
1 1
4 16
3 9
2 4
5 25
6 36
8 64
7 49
9 81
更新
这是 运行 所有内容的代码。
from QueueManager import *
from workers import create_workers
from client import client
from queue import Queue
from threading import Thread, Event
# So that queues are not unnecessarily created by worker processes under Windows:
work_tasks_queue = None
done_task_queue = None
def get_work_tasks_queue():
global work_tasks_queue
# singleton:
if work_tasks_queue is None:
work_tasks_queue = Queue()
return work_tasks_queue
def get_done_task_queue():
global done_task_queue
# singleton:
if done_task_queue is None:
done_task_queue = Queue()
return done_task_queue
def server(started_event, shutdown_event):
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
started_event.set() # tell main thread that we have started
shutdown_event.wait() # wait to be told to shutdown
net_manager.shutdown()
if __name__ == '__main__':
started_event = Event()
shutdown_event = Event()
server_thread = Thread(target=server, args=(started_event, shutdown_event,))
server_thread.start()
# wait for manager to start:
started_event.wait()
in_q, out_q = connect_to_manager()
N_WORKERS = 3
workers_thread = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS,))
workers_thread.start()
client()
# tell workers we are through:
for _ in range(N_WORKERS):
in_q.put(None)
#in_q.join() # not strictly necessary; assumes client's work has been completed too
workers_thread.join()
# tell manager we are through:
shutdown_event.set()
server_thread.join()
我是 Python 的新手。是否可以在单独的进程中通过网络 运行 多队列管理器?
我正在努力写,这是目前发生的事情:
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
import time
address = "127.0.0.1"
password = "secret"
if __name__ == '__main__':
with Manager() as manager:
work_tasks_queue = manager.Queue()
done_task_queue = manager.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('work_tasks_queue', callable=lambda: work_tasks_queue)
QueueManager.register('done_task_queue', callable=lambda: done_task_queue)
net_manager = QueueManager(address=(address, 50000), authkey=password.encode('utf-8'))
net_manager.start()
print("------------------------------------------------------")
此刻,我设法启动了带有多个队列的网络管理器。
我想在单独的进程中启动服务器。我希望能够在一个脚本中处理两个队列(添加新作业以处理或标记已完成的任务)。
看来我可以做到。这是一个例子。我也是这样做的。
https://github.com/shariq/burgundy/blob/master/remotequeue.py
但在我的例子中,我得到了错误:
Traceback (most recent call last):
File "D:\WORKSPACE\check_manager_in_event\main.py", line 47, in <module>
net_manager.start()
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\managers.py", line 553, in start
self._process.start()
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\process.py", line 121, in start
self._popen = self._Popen(self)
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\context.py", line 327, in _Popen
return Popen(process_obj)
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\TestUser\AppData\Local\Programs\Python\Python39\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x0000020B330551F0>: attribute lookup <lambda> on __main__ failed
Process finished with exit code 1
我在 Pycharm 社区 IDE.IDE 的 Windows 10 Professional 上使用 Python 3.9
有人可以帮忙解决这个问题吗?
有一些微妙之处。首先,当您向管理器注册一个 returns 对象的函数时,默认情况下管理器将尝试为该对象构建代理。但是您返回的对象是一个托管队列, 已经是 一个代理对象。因此,您应该只返回一个 ordinary 队列实例,如 second example in Using a Remote Manager.
下面的代码可以用参数 server 来启动远程管理器,workers 来启动一个进程池 3每个进程从 work_tasks_queue
中读取期望整数并将元组写入 task_done_queue
队列的过程,该队列由整数和整数的平方组成,作为结果或不带参数启动写入的客户端10 个整数到 work_tasks_queue
队列,然后从 task_done_queue
队列中读取 10 个结果,这可能是任意顺序。
身份验证似乎存在错误,进程池中的每个进程都必须按如下方式初始化自己的进程,否则管理器将拒绝请求:
current_process().authkey = password.encode('utf-8')
不用说,服务器、工作人员和客户端通常(或至少可能)在 3 台不同的机器上 运行(对 address
规范进行了调整)。
通用QueueManager.py模块
from multiprocessing.managers import BaseManager
address = "127.0.0.1"
port = 50000
password = "secret"
class QueueManager(BaseManager):
pass
def connect_to_manager():
QueueManager.register('work_tasks_queue')
QueueManager.register('done_task_queue')
manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
manager.connect()
return manager.work_tasks_queue(), manager.done_task_queue()
server.py
from QueueManager import *
from queue import Queue
work_tasks_queue = Queue()
done_task_queue = Queue()
def get_work_tasks_queue():
return work_tasks_queue
def get_done_task_queue():
return done_task_queue
def server():
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
input('Server starting. Hit Enter to terminate....')
net_manager.shutdown()
if __name__ == '__main__':
server()
workers.py
from QueueManager import *
from multiprocessing import Process, current_process, cpu_count
from threading import Thread
def worker(in_q, out_q):
current_process().authkey = password.encode('utf-8')
while True:
x = in_q.get()
if x is None: # signal to terminate
in_q.task_done()
break
out_q.put((x, x ** 2))
in_q.task_done()
def create_workers(in_q, out_q, n_workers):
processes = [Process(target=worker, args=(in_q, out_q)) for _ in range(n_workers)]
for process in processes:
process.start()
for process in processes:
process.join()
def start_workers():
N_WORKERS = cpu_count()
in_q, out_q = connect_to_manager()
t = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS))
t.start()
input('Starting workers. Hit enter to terminate...')
for _ in range(N_WORKERS):
in_q.put(None) # tell worker to quit
#in_q.join() # not strictly necessary; assumes client's work has been completed too
t.join()
if __name__ == '__main__':
start_workers()
client.py
from QueueManager import *
def client():
in_q, out_q = connect_to_manager()
for x in range(1, 10):
in_q.put(x)
# get results as they become available:
for x in range(1, 10):
x, result = out_q.get()
print(x, result)
if __name__ == '__main__':
client()
打印:
1 1
4 16
3 9
2 4
5 25
6 36
8 64
7 49
9 81
更新
这是 运行 所有内容的代码。
from QueueManager import *
from workers import create_workers
from client import client
from queue import Queue
from threading import Thread, Event
# So that queues are not unnecessarily created by worker processes under Windows:
work_tasks_queue = None
done_task_queue = None
def get_work_tasks_queue():
global work_tasks_queue
# singleton:
if work_tasks_queue is None:
work_tasks_queue = Queue()
return work_tasks_queue
def get_done_task_queue():
global done_task_queue
# singleton:
if done_task_queue is None:
done_task_queue = Queue()
return done_task_queue
def server(started_event, shutdown_event):
# Don't seem to be able to use a lambda or nested function when using net_manager.start():
QueueManager.register('work_tasks_queue', callable=get_work_tasks_queue)
QueueManager.register('done_task_queue', callable=get_done_task_queue)
net_manager = QueueManager(address=(address, port), authkey=password.encode('utf-8'))
net_manager.start()
started_event.set() # tell main thread that we have started
shutdown_event.wait() # wait to be told to shutdown
net_manager.shutdown()
if __name__ == '__main__':
started_event = Event()
shutdown_event = Event()
server_thread = Thread(target=server, args=(started_event, shutdown_event,))
server_thread.start()
# wait for manager to start:
started_event.wait()
in_q, out_q = connect_to_manager()
N_WORKERS = 3
workers_thread = Thread(target=create_workers, args=(in_q, out_q, N_WORKERS,))
workers_thread.start()
client()
# tell workers we are through:
for _ in range(N_WORKERS):
in_q.put(None)
#in_q.join() # not strictly necessary; assumes client's work has been completed too
workers_thread.join()
# tell manager we are through:
shutdown_event.set()
server_thread.join()