终止多处理池中的所有进程

Terminating all processes in Multiprocessing Pool

我有一个脚本,它本质上是一个 API scraper,它会永久运行。我给它绑了一个 map_async 池,它很漂亮,池隐藏了一些错误,我了解到这些错误很常见。所以我合并了这个包装的辅助函数。

helper.py

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except:
            print('Exception in '+func.__name__)
            traceback.print_exc()
    return wrapped_func

我的主脚本看起来像

scraper.py

import multiprocessing as mp
from helper import trace_unhandled_exceptions

start_block = 100
end_block = 50000

@trace_unhandled_exceptions
def main(block_num):
    block = blah_blah(block_num)
    return block

if __name__ == "__main__":
    cpus = min(8, mp.cpu_count()-1 or 1)

    pool = mp.Pool(cpus)
    pool.map_async(main, range(start_block - 20, end_block), chunksize=cpus)
    pool.close()
    pool.join()

效果很好,我收到异常:

Exception in main
Traceback (most recent call last):
.....

如何让脚本在出现异常时结束,我试过像这样将 os.exit 或 sys.exit 合并到辅助函数中

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except:
            print('Exception in '+func.__name__)
            traceback.print_exc()
            os._exit(1)
    return wrapped_func

但我相信它只会终止子进程而不是整个脚本,有什么建议吗?

我不确定池隐藏错误是什么意思。我的经验是,当辅助函数(即 Pool 方法的目标)引发未捕获的异常时,它不会被忽视。无论如何,...

我建议:

  1. 使用您的trace_unhandled_exception装饰器并允许您的工作函数main引发异常并且
  2. 而不是使用方法 map_async(为什么不是 map?),您使用方法 imap,它允许您迭代单个 return 值和任何main 可能抛出的异常,因为它们变得可用 因此,一旦您检测到异常,您就可以调用 multiprocessing.Pool.terminate() 来 (1) 取消任何已提交但尚未开始的任务或 (2) 运行ning 任务尚未完成。顺便说一句,即使您不调用terminate,一旦提交的任务中出现未捕获的异常,处理池也会刷新输入任务队列。

一旦主进程检测到异常,它就可以。当然是清理池子后调用sys.exit()

import multiprocessing as mp

start_block = 100
end_block = 50000

def main(block_num):
    if block_num == 1000:
        raise ValueError("I don't like 1000.")
    return block_num * block_num

if __name__ == "__main__":
    cpus = min(8, mp.cpu_count()-1 or 1)

    pool = mp.Pool(cpus)
    it = pool.imap(main, range(start_block - 20, end_block), chunksize=cpus)
    results = []
    while True:
        try:
            result = next(it)
        except StopIteration:
            break
        except Exception as e:
            print(e)
            # Kill remaining tasks
            pool.terminate()
            break
        else:
            results.append(result)
    pool.close()
    pool.join()

打印:

I don't like 1000.

或者,您可以保留装饰器函数,但将其修改为 return 它捕获的 Exception 实例(目前,它隐式 returns None)。那么你可以修改while True循环如下:

    while True:
        try:
            result = next(it)
        except StopIteration:
            break
        else:
            if isinstance(result, Exception):
                pool.terminate()
                break
            results.append(result)

由于没有引发实际的异常,如果您想继续执行而不允许剩余的已提交任务 运行,那么对 terminate 的调用就变得绝对必要。即使您只想立即退出,终止并清理池仍然是一个好主意,以确保在您调用退出时没有任何挂起。

我认为你不需要 trace_unhandled_exception 装饰器来做你想做的事,至少如果你使用 pool.apply_async() 而不是 pool.map_async() 则不需要,因为你可以使用 error_callback= 选项它支持在目标函数失败时得到通知。请注意,map_async() 也支持类似的东西,但在 entire 可迭代对象被消耗之前它不会被调用——所以它不适合你想做的事情。

我从@Tim Peters 那里得到了这种方法的想法 to a similar question titled

import multiprocessing as mp
import random
import time


START_BLOCK = 100
END_BLOCK = 1000

def blah_blah(block_num):
    if block_num % 10 == 0:
        print(f'Processing block {block_num}')
    time.sleep(random.uniform(.01, .1))
    return block_num

def main(block_num):
    if random.randint(0, 100) == 42:
        print(f'Raising radom exception')
        raise RuntimeError('RANDOM TEST EXCEPTION')
    block = blah_blah(block_num)
    return block

def error_handler(exception):
    print(f'{exception} occurred, terminating pool.')
    pool.terminate()

if __name__ == "__main__":
    processes = min(8, mp.cpu_count()-1 or 1)
    pool = mp.Pool(processes)
    for i in range(START_BLOCK-20, END_BLOCK):
        pool.apply_async(main, (i,), error_callback=error_handler)
    pool.close()
    pool.join()
    print('-fini-')