多处理,pool.ready 做什么?

Multiprocessing, what does pool.ready do?

假设我有一个池,在 class 中有几个进程,我用它来做一些处理,像这样:

class MyClass:

    def __init_(self):
        self.pool = Pool(processes = NUM_PROCESSES)
        self.pop = []
        self.finished = []

    def gen_pop(self):
        self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ]
        while (not self.check()):
            continue
        # Do some other stuff

    def check(self):
        self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES))
        new_pop = []
        for f in self.finished:
            new_pop.append(self.pop[f].get(timeout = 1))
            self.pop[f] = None
            # Do some other stuff

当我 运行 这段代码时,我得到一个 cPickle.PicklingError,它指出 <type 'function'> 不能被 pickle。这告诉我,apply_async 函数之一尚未返回,因此我试图将 运行ning 函数附加到另一个列表。但这不应该发生,因为所有 运行ning 调用都应该使用 ready() 函数过滤掉。

在相关说明中,Item class 的实际性质并不重要,但重要的是在我的 Item.test 函数顶部有一个打印语句,它应该为了调试目的而触发。但是,这不会发生。这告诉我该功能已启动但尚未真正开始执行。

那么,看来 ready() 实际上并没有告诉我调用是否已完成执行。 ready() 究竟做了什么?我应该如何编辑我的代码,以便我可以过滤掉仍在 运行ning 的进程?

Multiprocessing 在内部使用 pickle 模块在进程之间传递数据, 所以你的数据必须是 picklable。请参阅 the list of what is considered picklable,对象方法不在该列表中。
要快速解决这个问题,只需在方法周围使用包装函数:

def wrap_item_test(item):
    item.test()

class MyClass:
    def gen_pop(self):
        self.pop = [ self.pool.apply_async(wrap_item_test, (Item(),)) for _ in range(NUM_PROCESSES) ]
        while (not self.check()):
            continue

回答你问的问题,.ready()其实是在告诉你.get() 是否可以 block: if .ready() returns True, .get() 阻止, 但如果 .ready() returns False, .get() 可能 阻塞(也可能不会:异步调用很可能会在您开始调用 .get() 之前完成)。

所以,例如,.get() 中的 timeout = 1 没有任何用处:因为你只调用 .get() if .ready() returned True,你已经知道 .get() 不会阻塞。

但是.get()没有阻塞暗示异步调用成功,或者甚至工作进程甚至开始处理异步调用:正如文档所说,

If the remote call raised an exception then that exception will be reraised by get().

也就是说,如果 根本无法执行异步调用.ready() 将 return True.get() 将(重新)引发阻止尝试工作的异常。

这似乎是你的情况,尽管我们不得不猜测,因为你没有 post 可运行的代码,也没有包括回溯。

请注意,如果您真正想知道异步调用是否正常完成,在已经从 .ready() 返回 True 之后,那么 .successful() 就是要调用的方法。

很明显,无论 Item.test 是什么,由于 pickle 限制,将其作为可调用对象传递给 .apply_async() 是完全不可能的。这解释了为什么 Item.test 从不打印任何东西(实际上从未调用过!),为什么 .ready() returns True.apply_async() 调用失败),以及为什么 .get() 引发异常(因为 .apply_async() 在尝试 pickle 其参数之一时遇到异常 - 可能是 Item.test)。