如何在并行进程池中合并对象?
How do I pool objects in a parallel process pool?
我有一个函数可以在可变对象的帮助下进行计算,如下例所示:
def fun(obj: MutableObject, input_a, input_b):
obj.a = input_a
return obj.do_stuff(input_b)
我需要多次执行此操作,目前正在使用 for 循环,如下所示:
obj = MutableObject()
output = []
for input_a, input_b in inputs:
output.append(fun(obj, input_a, input_b))
为了加快处理速度,我想使用 python 多处理并并行执行 fun
的多个调用。我见过的一种常见方法是使用 multiproccesing.Pool
映射列表。这种实现对我来说的问题是我有需要在进程之间共享的可变对象。我希望每个进程都可以访问对象的克隆,而不会创建不必要的许多克隆。
天真的尝试是为每个输入复制对象:
import multiprocessing
import copy
obj = MutableObject()
def map_fun(arg):
input_a, input_b = arg
temp_obj = copy.deepcopy(obj)
return fun(temp_obj, input_a, input_b)
pool = multiprocessing.Pool()
outputs = pool.map(map_fun, inputs)
但这似乎很浪费 CPU 和内存。
有什么方法可以创建一个临时的对象副本池,每个并行进程一个,而不是为每个输入对创建一个?
编辑:
有人在评论中指出,内存可能不会成为问题,因为垃圾回收会清理未使用的副本。我仍然担心复制需要很多资源,因为我的 MutableObject 实际上是一个 Keras 模型(神经网络),它可能非常大。
这是一个解决方案,它删除池并管理线程本身,确保每个进程只有一个对象:
from multiprocessing import Process, cpu_count, JoinableQueue
class MuteableObj:
def method(self, data):
data["processed"] = True
return data
class Worker(Process):
def __init__(self, task_queue, result_queue):
super().__init__()
print("Started", self._name)
self.task_queue = task_queue
self.result_queue = result_queue
self._obj = MuteableObj()
self._open = True
def run(self):
while self._open:
task = self.task_queue.get()
print(f"Processing {task['id']}")
result = self._obj.method(task)
self.task_queue.task_done()
self.result_queue.put(result)
print("over")
def terminate(self):
print("Stopped", self._name)
super().terminate()
task_queue = JoinableQueue()
result_queue = JoinableQueue()
NTHREADS = cpu_count()
for i in range(200):
task_queue.put(dict(id=i))
threads = [Worker(task_queue, result_queue) for i in range(NTHREADS)]
for t in threads:
t.start()
task_queue.join()
for t in threads:
t.terminate()
results = []
while not result_queue.empty():
results.append(result_queue.get())
print(results)
首先我们有一个可变对象的模拟,这里只是一个 class 我们关心的一个方法。
我们自己子class Process
,并在初始化时给每个进程一个对象。然后我们用需要的任务填充一个JoinableQueue
,等到它们全部完成,当我们从另一个队列中取出所有结果时(虽然我们实际上可以使用一个列表和Lock
,但我认为这样更容易阅读)。
请注意,不能保证结果按发送顺序排列。如果这很重要,你应该像我在这里给他们一个 ID,然后按 ID 排序。
如果您需要无限期地 运行 池并对每个结果做特定的事情,您可能想要编写一个回调,将 join()
移到代码的末尾(因为它阻塞直到所有任务都被处理)然后有一个循环等待结果并调用你的回调:
from time import sleep
while running:
while not results_queue.empty():
callback(results_queue.get())
while results_queue.empty():
sleep(0.1)
在这种情况下,我会将所有这些都包装在 另一个 class 中,称为 TaskRunner
,以保持状态(如 running
) 隔离。
顺便说一下,我几年前第一次在 SO 上看到这个食谱,从那以后我就一直在使用它。
我有一个函数可以在可变对象的帮助下进行计算,如下例所示:
def fun(obj: MutableObject, input_a, input_b):
obj.a = input_a
return obj.do_stuff(input_b)
我需要多次执行此操作,目前正在使用 for 循环,如下所示:
obj = MutableObject()
output = []
for input_a, input_b in inputs:
output.append(fun(obj, input_a, input_b))
为了加快处理速度,我想使用 python 多处理并并行执行 fun
的多个调用。我见过的一种常见方法是使用 multiproccesing.Pool
映射列表。这种实现对我来说的问题是我有需要在进程之间共享的可变对象。我希望每个进程都可以访问对象的克隆,而不会创建不必要的许多克隆。
天真的尝试是为每个输入复制对象:
import multiprocessing
import copy
obj = MutableObject()
def map_fun(arg):
input_a, input_b = arg
temp_obj = copy.deepcopy(obj)
return fun(temp_obj, input_a, input_b)
pool = multiprocessing.Pool()
outputs = pool.map(map_fun, inputs)
但这似乎很浪费 CPU 和内存。
有什么方法可以创建一个临时的对象副本池,每个并行进程一个,而不是为每个输入对创建一个?
编辑:
有人在评论中指出,内存可能不会成为问题,因为垃圾回收会清理未使用的副本。我仍然担心复制需要很多资源,因为我的 MutableObject 实际上是一个 Keras 模型(神经网络),它可能非常大。
这是一个解决方案,它删除池并管理线程本身,确保每个进程只有一个对象:
from multiprocessing import Process, cpu_count, JoinableQueue
class MuteableObj:
def method(self, data):
data["processed"] = True
return data
class Worker(Process):
def __init__(self, task_queue, result_queue):
super().__init__()
print("Started", self._name)
self.task_queue = task_queue
self.result_queue = result_queue
self._obj = MuteableObj()
self._open = True
def run(self):
while self._open:
task = self.task_queue.get()
print(f"Processing {task['id']}")
result = self._obj.method(task)
self.task_queue.task_done()
self.result_queue.put(result)
print("over")
def terminate(self):
print("Stopped", self._name)
super().terminate()
task_queue = JoinableQueue()
result_queue = JoinableQueue()
NTHREADS = cpu_count()
for i in range(200):
task_queue.put(dict(id=i))
threads = [Worker(task_queue, result_queue) for i in range(NTHREADS)]
for t in threads:
t.start()
task_queue.join()
for t in threads:
t.terminate()
results = []
while not result_queue.empty():
results.append(result_queue.get())
print(results)
首先我们有一个可变对象的模拟,这里只是一个 class 我们关心的一个方法。
我们自己子class Process
,并在初始化时给每个进程一个对象。然后我们用需要的任务填充一个JoinableQueue
,等到它们全部完成,当我们从另一个队列中取出所有结果时(虽然我们实际上可以使用一个列表和Lock
,但我认为这样更容易阅读)。
请注意,不能保证结果按发送顺序排列。如果这很重要,你应该像我在这里给他们一个 ID,然后按 ID 排序。
如果您需要无限期地 运行 池并对每个结果做特定的事情,您可能想要编写一个回调,将 join()
移到代码的末尾(因为它阻塞直到所有任务都被处理)然后有一个循环等待结果并调用你的回调:
from time import sleep
while running:
while not results_queue.empty():
callback(results_queue.get())
while results_queue.empty():
sleep(0.1)
在这种情况下,我会将所有这些都包装在 另一个 class 中,称为 TaskRunner
,以保持状态(如 running
) 隔离。
顺便说一下,我几年前第一次在 SO 上看到这个食谱,从那以后我就一直在使用它。