如何在并行进程池中合并对象?

How do I pool objects in a parallel process pool?

我有一个函数可以在可变对象的帮助下进行计算,如下例所示:

def fun(obj: MutableObject, input_a, input_b):
    obj.a = input_a
    return obj.do_stuff(input_b)

我需要多次执行此操作,目前正在使用 for 循环,如下所示:

obj = MutableObject()
output = []

for input_a, input_b in inputs:
    output.append(fun(obj, input_a, input_b))

为了加快处理速度,我想使用 python 多处理并并行执行 fun 的多个调用。我见过的一种常见方法是使用 multiproccesing.Pool 映射列表。这种实现对我来说的问题是我有需要在进程之间共享的可变对象。我希望每个进程都可以访问对象的克隆,而不会创建不必要的许多克隆。

天真的尝试是为每个输入复制对象:

import multiprocessing
import copy

obj = MutableObject()
def map_fun(arg):
    input_a, input_b = arg
    temp_obj = copy.deepcopy(obj)
    return fun(temp_obj, input_a, input_b)

pool = multiprocessing.Pool()
outputs = pool.map(map_fun, inputs)

但这似乎很浪费 CPU 和内存。

有什么方法可以创建一个临时的对象副本池,每个并行进程一个,而不是为每个输入对创建一个?

编辑:

有人在评论中指出,内存可能不会成为问题,因为垃圾回收会清理未使用的副本。我仍然担心复制需要很多资源,因为我的 MutableObject 实际上是一个 Keras 模型(神经网络),它可能非常大。

这是一个解决方案,它删除池并管理线程本身,确保每个进程只有一个对象:

from multiprocessing import Process, cpu_count, JoinableQueue


class MuteableObj:
    def method(self, data):
        data["processed"] = True
        return data


class Worker(Process):
    def __init__(self, task_queue, result_queue):
        super().__init__()
        print("Started", self._name)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self._obj = MuteableObj()
        self._open = True

    def run(self):
        while self._open:
            task = self.task_queue.get()
            print(f"Processing {task['id']}")
            result = self._obj.method(task)
            self.task_queue.task_done()
            self.result_queue.put(result)
        print("over")

    def terminate(self):
        print("Stopped", self._name)
        super().terminate()


task_queue = JoinableQueue()
result_queue = JoinableQueue()

NTHREADS = cpu_count()

for i in range(200):
    task_queue.put(dict(id=i))


threads = [Worker(task_queue, result_queue) for i in range(NTHREADS)]
for t in threads:
    t.start()

task_queue.join()

for t in threads:
    t.terminate()

results = []
while not result_queue.empty():
    results.append(result_queue.get())

print(results)

首先我们有一个可变对象的模拟,这里只是一个 class 我们关心的一个方法。

我们自己子class Process,并在初始化时给每个进程一个对象。然后我们用需要的任务填充一个JoinableQueue,等到它们全部完成,当我们从另一个队列中取出所有结果时(虽然我们实际上可以使用一个列表和Lock,但我认为这样更容易阅读)。

请注意,不能保证结果按发送顺序排列。如果这很重要,你应该像我在这里给他们一个 ID,然后按 ID 排序。

如果您需要无限期地 运行 池并对每个结果做特定的事情,您可能想要编写一个回调,将 join() 移到代码的末尾(因为它阻塞直到所有任务都被处理)然后有一个循环等待结果并调用你的回调:

from time import sleep

while running:
    while not results_queue.empty():
        callback(results_queue.get())

    while results_queue.empty():
        sleep(0.1)

在这种情况下,我会将所有这些都包装在 另一个 class 中,称为 TaskRunner,以保持状态(如 running) 隔离。

顺便说一下,我几年前第一次在 SO 上看到这个食谱,从那以后我就一直在使用它。