多处理,pool.ready 做什么?
Multiprocessing, what does pool.ready do?
假设我有一个池,在 class 中有几个进程,我用它来做一些处理,像这样:
class MyClass:
def __init_(self):
self.pool = Pool(processes = NUM_PROCESSES)
self.pop = []
self.finished = []
def gen_pop(self):
self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue
# Do some other stuff
def check(self):
self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES))
new_pop = []
for f in self.finished:
new_pop.append(self.pop[f].get(timeout = 1))
self.pop[f] = None
# Do some other stuff
当我 运行 这段代码时,我得到一个 cPickle.PicklingError
,它指出 <type 'function'>
不能被 pickle。这告诉我,apply_async
函数之一尚未返回,因此我试图将 运行ning 函数附加到另一个列表。但这不应该发生,因为所有 运行ning 调用都应该使用 ready()
函数过滤掉。
在相关说明中,Item
class 的实际性质并不重要,但重要的是在我的 Item.test
函数顶部有一个打印语句,它应该为了调试目的而触发。但是,这不会发生。这告诉我该功能已启动但尚未真正开始执行。
那么,看来 ready()
实际上并没有告诉我调用是否已完成执行。 ready()
究竟做了什么?我应该如何编辑我的代码,以便我可以过滤掉仍在 运行ning 的进程?
Multiprocessing 在内部使用 pickle
模块在进程之间传递数据,
所以你的数据必须是 picklable。请参阅 the list of what is considered picklable,对象方法不在该列表中。
要快速解决这个问题,只需在方法周围使用包装函数:
def wrap_item_test(item):
item.test()
class MyClass:
def gen_pop(self):
self.pop = [ self.pool.apply_async(wrap_item_test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue
回答你问的问题,.ready()
其实是在告诉你.get()
是否可以 block: if .ready()
returns True
, .get()
将 不 阻止, 但如果 .ready()
returns False
, .get()
可能 阻塞(也可能不会:异步调用很可能会在您开始调用 .get()
之前完成)。
所以,例如,.get()
中的 timeout = 1
没有任何用处:因为你只调用 .get()
if .ready()
returned True
,你已经知道 .get()
不会阻塞。
但是.get()
没有阻塞不暗示异步调用成功,或者甚至工作进程甚至开始处理异步调用:正如文档所说,
If the remote call raised an exception then that exception will be reraised by get()
.
也就是说,如果 根本无法执行异步调用,.ready()
将 return True
和 .get()
将(重新)引发阻止尝试工作的异常。
这似乎是你的情况,尽管我们不得不猜测,因为你没有 post 可运行的代码,也没有包括回溯。
请注意,如果您真正想知道异步调用是否正常完成,在已经从 .ready()
返回 True
之后,那么 .successful()
就是要调用的方法。
很明显,无论 Item.test
是什么,由于 pickle 限制,将其作为可调用对象传递给 .apply_async()
是完全不可能的。这解释了为什么 Item.test
从不打印任何东西(实际上从未调用过!),为什么 .ready()
returns True
(.apply_async()
调用失败),以及为什么 .get()
引发异常(因为 .apply_async()
在尝试 pickle 其参数之一时遇到异常 - 可能是 Item.test
)。
假设我有一个池,在 class 中有几个进程,我用它来做一些处理,像这样:
class MyClass:
def __init_(self):
self.pool = Pool(processes = NUM_PROCESSES)
self.pop = []
self.finished = []
def gen_pop(self):
self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue
# Do some other stuff
def check(self):
self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES))
new_pop = []
for f in self.finished:
new_pop.append(self.pop[f].get(timeout = 1))
self.pop[f] = None
# Do some other stuff
当我 运行 这段代码时,我得到一个 cPickle.PicklingError
,它指出 <type 'function'>
不能被 pickle。这告诉我,apply_async
函数之一尚未返回,因此我试图将 运行ning 函数附加到另一个列表。但这不应该发生,因为所有 运行ning 调用都应该使用 ready()
函数过滤掉。
在相关说明中,Item
class 的实际性质并不重要,但重要的是在我的 Item.test
函数顶部有一个打印语句,它应该为了调试目的而触发。但是,这不会发生。这告诉我该功能已启动但尚未真正开始执行。
那么,看来 ready()
实际上并没有告诉我调用是否已完成执行。 ready()
究竟做了什么?我应该如何编辑我的代码,以便我可以过滤掉仍在 运行ning 的进程?
Multiprocessing 在内部使用 pickle
模块在进程之间传递数据,
所以你的数据必须是 picklable。请参阅 the list of what is considered picklable,对象方法不在该列表中。
要快速解决这个问题,只需在方法周围使用包装函数:
def wrap_item_test(item):
item.test()
class MyClass:
def gen_pop(self):
self.pop = [ self.pool.apply_async(wrap_item_test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue
回答你问的问题,.ready()
其实是在告诉你.get()
是否可以 block: if .ready()
returns True
, .get()
将 不 阻止, 但如果 .ready()
returns False
, .get()
可能 阻塞(也可能不会:异步调用很可能会在您开始调用 .get()
之前完成)。
所以,例如,.get()
中的 timeout = 1
没有任何用处:因为你只调用 .get()
if .ready()
returned True
,你已经知道 .get()
不会阻塞。
但是.get()
没有阻塞不暗示异步调用成功,或者甚至工作进程甚至开始处理异步调用:正如文档所说,
If the remote call raised an exception then that exception will be reraised by
get()
.
也就是说,如果 根本无法执行异步调用,.ready()
将 return True
和 .get()
将(重新)引发阻止尝试工作的异常。
这似乎是你的情况,尽管我们不得不猜测,因为你没有 post 可运行的代码,也没有包括回溯。
请注意,如果您真正想知道异步调用是否正常完成,在已经从 .ready()
返回 True
之后,那么 .successful()
就是要调用的方法。
很明显,无论 Item.test
是什么,由于 pickle 限制,将其作为可调用对象传递给 .apply_async()
是完全不可能的。这解释了为什么 Item.test
从不打印任何东西(实际上从未调用过!),为什么 .ready()
returns True
(.apply_async()
调用失败),以及为什么 .get()
引发异常(因为 .apply_async()
在尝试 pickle 其参数之一时遇到异常 - 可能是 Item.test
)。