在异常或键盘中断时退出多处理池?

Exit from multiprocessing pool upon exception or KeyboardInterrupt?

我希望我的程序在按下 Ctrl+C:

时立即退出
import multiprocessing
import os
import time

def sqr(a):
    time.sleep(0.2)
    print 'local {}'.format(os.getpid())
    #raise Exception()
    return a * a

pool = multiprocessing.Pool(processes=4)

try:
    r = [pool.apply_async(sqr, (x,)) for x in range(100)]
    pool.close()
    pool.join()
except:
    print 121312313
    pool.terminate()
    pool.join()

print 'main {}'.format(os.getpid())

这段代码没有按预期工作:当我按下 Ctrl+C 时程序没有退出。相反,它每次都会打印一些 KeyboardInterrupt,然后永远卡住。

此外,如果我在 sqr 中取消注释 #raise ...,我希望它尽快退出。 Exception thrown in multiprocessing Pool not detected 中的解决方案似乎没有帮助。

更新

我想我终于得到了这个:(如果有错请告诉我)

def sqr(a):
    time.sleep(0.2)
    print 'local {}'.format(os.getpid())
    if a == 20:
        raise Exception('fff')
    return a * a

pool = Pool(processes=4)


r = [pool.apply_async(sqr, (x,)) for x in range(100)]

pool.close()

# Without timeout, cannot respond to KeyboardInterrupt.
# Also need get to raise the exceptions workers may throw.
for item in r:
    item.get(timeout=999999)

# I don't think I need join since I already get everything.
pool.join()

print 'main {}'.format(os.getpid())

这是因为 Python 2.x 错误使得对 pool.join() 的调用无法中断。它在 Python 3.x 中运行良好。通常解决方法是将一个非常大的 timeout 传递给 join,但 multiprocessing.Pool.join 不采用 timeout 参数,因此您根本无法使用它。相反,您需要等待池中的每个任务完成,然后将 timeout 传递给 wait() 方法:

import multiprocessing
import time
import os

Pool = multiprocessing.Pool

def sqr(a):
    time.sleep(0.2)
    print('local {}'.format(os.getpid()))
    #raise Exception()
    return a * a

pool = Pool(processes=4)

try:
    r = [pool.apply_async(sqr, (x,)) for x in range(100)]
    pool.close()
    for item in r:
        item.wait(timeout=9999999) # Without a timeout, you can't interrupt this.
except KeyboardInterrupt:
    pool.terminate()
finally:
    pool.join()

print('main {}'.format(os.getpid()))

这可以在 Python 2 和 3 上中断。