多处理池 - 如果一个 returns 想要的结果,如何取消所有 运行 个进程?
Multiprocessing Pool - how to cancel all running processes if one returns the desired result?
给定以下 Python 代码:
import multiprocessing
def unique(somelist):
return len(set(somelist)) == len(somelist)
if __name__ == '__main__':
somelist = [[1,2,3,4,5,6,7,8,9,10,11,12,13,2], [1,2,3,4,5], [1,2,3,4,5,6,7,8,9,1], [0,1,5,1]]
pool = multiprocessing.Pool()
reslist = pool.map(unique, somelist)
pool.close()
pool.join()
print "Done!"
print reslist
现在想象一下,这个玩具示例中的整数列表非常长,我想在这里实现的是:如果某个列表中的列表之一 returns 为真,则杀死所有运行 个进程。
这会引出两个问题(可能还有更多我没有想到的问题):
我怎样才能从完成的进程中“读取”或“收听”结果,而其他进程是 运行?如果例如一个进程正在处理来自 somelist
的 [1,2,3,4,5]
,并且在所有其他进程之前完成,我如何在此时从该进程中读取结果?
假设可以“读出”已完成进程的结果,而其他进程是 运行:我如何使用此结果作为终止所有其他进程的条件运行 个进程?
例如如果一个进程已完成并返回 True
,我如何将其用作终止所有其他(仍然)运行 进程的条件?
使用 pool.imap_unordered
以任意顺序查看结果。
reslist = pool.imap_unordered(unique, somelist)
pool.close()
for res in reslist:
if res: # or set other condition here
pool.terminate()
break
pool.join()
您可以在主进程中迭代 imap
reslist,同时池进程仍在生成结果。
没有花哨的 IPC(进程间通信)技巧,最简单的方法是使用带有回调函数的 Pool
方法。回调在主程序中运行(在 multiprocessing
创建的线程中),并在每个结果可用时使用它。当回调看到您喜欢的结果时,它可以终止 Pool
。例如,
import multiprocessing as mp
def worker(i):
from time import sleep
sleep(i)
return i, (i == 5)
def callback(t):
i, quit = t
result[i] = quit
if quit:
pool.terminate()
if __name__ == "__main__":
N = 50
pool = mp.Pool()
result = [None] * N
for i in range(N):
pool.apply_async(func=worker, args=(i,), callback=callback)
pool.close()
pool.join()
print(result)
几乎肯定会显示以下内容(OS 调度变幻莫测 可能 允许消耗一两个输入):
[False, False, False, False, False, True, None, None, None, None,
None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None, None, None, None, None]
给定以下 Python 代码:
import multiprocessing
def unique(somelist):
return len(set(somelist)) == len(somelist)
if __name__ == '__main__':
somelist = [[1,2,3,4,5,6,7,8,9,10,11,12,13,2], [1,2,3,4,5], [1,2,3,4,5,6,7,8,9,1], [0,1,5,1]]
pool = multiprocessing.Pool()
reslist = pool.map(unique, somelist)
pool.close()
pool.join()
print "Done!"
print reslist
现在想象一下,这个玩具示例中的整数列表非常长,我想在这里实现的是:如果某个列表中的列表之一 returns 为真,则杀死所有运行 个进程。
这会引出两个问题(可能还有更多我没有想到的问题):
我怎样才能从完成的进程中“读取”或“收听”结果,而其他进程是 运行?如果例如一个进程正在处理来自
somelist
的[1,2,3,4,5]
,并且在所有其他进程之前完成,我如何在此时从该进程中读取结果?假设可以“读出”已完成进程的结果,而其他进程是 运行:我如何使用此结果作为终止所有其他进程的条件运行 个进程?
例如如果一个进程已完成并返回True
,我如何将其用作终止所有其他(仍然)运行 进程的条件?
使用 pool.imap_unordered
以任意顺序查看结果。
reslist = pool.imap_unordered(unique, somelist)
pool.close()
for res in reslist:
if res: # or set other condition here
pool.terminate()
break
pool.join()
您可以在主进程中迭代 imap
reslist,同时池进程仍在生成结果。
没有花哨的 IPC(进程间通信)技巧,最简单的方法是使用带有回调函数的 Pool
方法。回调在主程序中运行(在 multiprocessing
创建的线程中),并在每个结果可用时使用它。当回调看到您喜欢的结果时,它可以终止 Pool
。例如,
import multiprocessing as mp
def worker(i):
from time import sleep
sleep(i)
return i, (i == 5)
def callback(t):
i, quit = t
result[i] = quit
if quit:
pool.terminate()
if __name__ == "__main__":
N = 50
pool = mp.Pool()
result = [None] * N
for i in range(N):
pool.apply_async(func=worker, args=(i,), callback=callback)
pool.close()
pool.join()
print(result)
几乎肯定会显示以下内容(OS 调度变幻莫测 可能 允许消耗一两个输入):
[False, False, False, False, False, True, None, None, None, None,
None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None, None, None, None, None,
None, None, None, None, None, None, None, None, None, None]