multiprocessing.pool 不终止 (python 2.7)
multiprocessing.pool does not terminate (python 2.7)
我对 multiprocessing.pool 有疑问。即使满足下面 while 循环中定义的退出条件,我的工作人员也永远不会退出。在最后一个工人完成其工作后,不再进入 while 循环。但是,子进程并没有像我预期的那样终止,而只是处于空闲状态,主进程不会继续。这是在 Ubuntu 上。
最后的输出是"Done",之后什么也没有发生。如果我添加我在下面注释掉的行,即 handler.get()
程序运行并正确终止而没有错误(仅使用一个进程)。也许这里有一个明显的错误,但我没有想法,任何帮助表示赞赏!
manager = multiprocessing.Manager()
pool = multiprocessing.Pool()
queue = manager.Queue()
lock = manager.Lock()
finished = manager.list()
active = manager.list()
pending = manager.list()
for core in core_list:
queue.put(core)
pending.put(core.id)
while len(pending) > 0:
print "Submit jobs"
core = queue.get(block=True)
handler = pool.apply_async(solve_core, (core, core_list, params))
#handler.get()
pool.close()
pool.join()
def solve_core(core, core_list, params):
lock.acquire()
pending.remove(core.id)
active.append(core.id)
lock.release()
# Process some data...
lock.acquire()
active.remove(core.id)
finished.append(core.id)
for new_core in core_list:
if some_condition:
queue.put(new_core)
pending.append(new_core.id)
lock.release()
print "Done"
存在明显的竞争条件错误,尽管还有更多错误。
您的程序依赖子进程来清空 pending
列表,但是当您使用 apply_async
时,子进程可能无法像主进程的 while len(pending) > 0
循环一样快速地更改 pending
列表,然后主进程将调用 queue.get(block=True)
的次数多于队列的大小,因此主进程在 queue.get
.
上阻塞
我对 multiprocessing.pool 有疑问。即使满足下面 while 循环中定义的退出条件,我的工作人员也永远不会退出。在最后一个工人完成其工作后,不再进入 while 循环。但是,子进程并没有像我预期的那样终止,而只是处于空闲状态,主进程不会继续。这是在 Ubuntu 上。
最后的输出是"Done",之后什么也没有发生。如果我添加我在下面注释掉的行,即 handler.get()
程序运行并正确终止而没有错误(仅使用一个进程)。也许这里有一个明显的错误,但我没有想法,任何帮助表示赞赏!
manager = multiprocessing.Manager()
pool = multiprocessing.Pool()
queue = manager.Queue()
lock = manager.Lock()
finished = manager.list()
active = manager.list()
pending = manager.list()
for core in core_list:
queue.put(core)
pending.put(core.id)
while len(pending) > 0:
print "Submit jobs"
core = queue.get(block=True)
handler = pool.apply_async(solve_core, (core, core_list, params))
#handler.get()
pool.close()
pool.join()
def solve_core(core, core_list, params):
lock.acquire()
pending.remove(core.id)
active.append(core.id)
lock.release()
# Process some data...
lock.acquire()
active.remove(core.id)
finished.append(core.id)
for new_core in core_list:
if some_condition:
queue.put(new_core)
pending.append(new_core.id)
lock.release()
print "Done"
存在明显的竞争条件错误,尽管还有更多错误。
您的程序依赖子进程来清空 pending
列表,但是当您使用 apply_async
时,子进程可能无法像主进程的 while len(pending) > 0
循环一样快速地更改 pending
列表,然后主进程将调用 queue.get(block=True)
的次数多于队列的大小,因此主进程在 queue.get
.