如何在 Python 中分叉和加入具有全局超时的多个子进程?

How to fork and join multiple subprocesses with a global timeout in Python?

我想在多个子进程中并行执行一些任务,如果任务没有在一定延迟内完成则超时。

第一种方法包括 单独 分叉和加入子进程,并根据全局超时计算剩余超时,如 中建议的那样。它对我来说很好用。

我想在这里使用的第二种方法是创建一个子进程池并等待全局超时,如 this answer 中所建议的那样。

但是我对第二种方法有问题:在向子进程池提供具有 multiprocessing.Event() 对象的任务后,等待它们完成会引发此异常:

RuntimeError: Condition objects should only be shared between processes through inheritance

这是 Python 代码片段:

import multiprocessing.pool
import time


class Worker:

    def __init__(self):
        self.event = multiprocessing.Event()  # commenting this removes the RuntimeError

    def work(self, x):
        time.sleep(1)
        return x * 10


if __name__ == "__main__":
    pool_size = 2
    timeout = 5

    with multiprocessing.pool.Pool(pool_size) as pool:
        result = pool.map_async(Worker().work, [4, 5, 2, 7])
        print(result.get(timeout))  # raises the RuntimeError

multiprocessing"Programming guidlines" 部分 — Process-based parallelism 文档中,有这一段:

Better to inherit than pickle/unpickle

When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

因此 multiprocessing.Event() 导致了 RuntimeError 因为它不可拾取,如以下 Python 代码片段所示:

import multiprocessing
import pickle

pickle.dumps(multiprocessing.Event())

引发相同的异常:

RuntimeError: Condition objects should only be shared between processes through inheritance

一个解决方案是使用 proxy object:

A proxy is an object which refers to a shared object which lives (presumably) in a different process.

因为:

An important feature of proxy objects is that they are picklable so they can be passed between processes.

multiprocessing.Manager().Event() 创建一个共享的 threading.Event() 对象和 returns 它的代理,因此替换此行:

self.event = multiprocessing.Event()

通过问题的Python代码段中的下面一行解决了问题:

self.event = multiprocessing.Manager().Event()