在 Python 的多进程中硬杀死挂起的子进程
Hard-kill hanging sub-processes in Python's multiprocessing
我有一个 Python 函数调用 C 函数的包装器(我无法更改)。大多数时候 C 函数非常快,但是当它失败时,调用就会永远挂起。为了缓解这种情况,我使用 multiprocessing
:
使调用超时
pool = multiprocessing.Pool(processes=4)
try:
res = pool.apply_async(my_dangerous_cpp_function, args=(bunch, of, vars))
return res.get(timeout=1.)
except multiprocessing.TimeoutError:
terminate_pool(pool)
pool = multiprocessing.Pool(processes=4)
当被调用的函数没有响应任何信号时,如何终止池?
如果我用 pool.terminate()
替换 terminate_pool(pool)
,那么对 pool.terminate()
的调用也会挂起。相反,我目前正在向所有子进程发送 SIGKILL:
def terminate_pool(pool):
for p in pool._pool:
os.kill(p.pid, 9)
pool.close() # ok, doesn't hang
#pool.join() # not ok, hangs forever
这样,挂起的子进程停止吃 100% CPU,但是我不能调用 pool.terminate()
或 pool.join()
(它们挂起),所以我就离开池后面的对象并创建一个新对象。即使他们收到了 SIGKILL,子流程仍然打开,所以我的 Python 流程数量从未停止增加...
有没有办法一劳永逸地消灭池及其所有子进程?
标准 multiprocessing.Pool
不是为处理工人超时而设计的。
Pebble处理池支持超时任务
from pebble import process, TimeoutError
with process.Pool() as pool:
task = pool.schedule(function, args=[1,2], timeout=5)
try:
result = task.get()
except TimeoutError:
print "Task: %s took more than 5 seconds to complete" % task
我有一个 Python 函数调用 C 函数的包装器(我无法更改)。大多数时候 C 函数非常快,但是当它失败时,调用就会永远挂起。为了缓解这种情况,我使用 multiprocessing
:
pool = multiprocessing.Pool(processes=4)
try:
res = pool.apply_async(my_dangerous_cpp_function, args=(bunch, of, vars))
return res.get(timeout=1.)
except multiprocessing.TimeoutError:
terminate_pool(pool)
pool = multiprocessing.Pool(processes=4)
当被调用的函数没有响应任何信号时,如何终止池?
如果我用 pool.terminate()
替换 terminate_pool(pool)
,那么对 pool.terminate()
的调用也会挂起。相反,我目前正在向所有子进程发送 SIGKILL:
def terminate_pool(pool):
for p in pool._pool:
os.kill(p.pid, 9)
pool.close() # ok, doesn't hang
#pool.join() # not ok, hangs forever
这样,挂起的子进程停止吃 100% CPU,但是我不能调用 pool.terminate()
或 pool.join()
(它们挂起),所以我就离开池后面的对象并创建一个新对象。即使他们收到了 SIGKILL,子流程仍然打开,所以我的 Python 流程数量从未停止增加...
有没有办法一劳永逸地消灭池及其所有子进程?
标准 multiprocessing.Pool
不是为处理工人超时而设计的。
Pebble处理池支持超时任务
from pebble import process, TimeoutError
with process.Pool() as pool:
task = pool.schedule(function, args=[1,2], timeout=5)
try:
result = task.get()
except TimeoutError:
print "Task: %s took more than 5 seconds to complete" % task