终止多处理池中的所有进程
Terminating all processes in Multiprocessing Pool
我有一个脚本,它本质上是一个 API scraper,它会永久运行。我给它绑了一个 map_async 池,它很漂亮,池隐藏了一些错误,我了解到这些错误很常见。所以我合并了这个包装的辅助函数。
helper.py
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
print('Exception in '+func.__name__)
traceback.print_exc()
return wrapped_func
我的主脚本看起来像
scraper.py
import multiprocessing as mp
from helper import trace_unhandled_exceptions
start_block = 100
end_block = 50000
@trace_unhandled_exceptions
def main(block_num):
block = blah_blah(block_num)
return block
if __name__ == "__main__":
cpus = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(cpus)
pool.map_async(main, range(start_block - 20, end_block), chunksize=cpus)
pool.close()
pool.join()
效果很好,我收到异常:
Exception in main
Traceback (most recent call last):
.....
如何让脚本在出现异常时结束,我试过像这样将 os.exit 或 sys.exit 合并到辅助函数中
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
print('Exception in '+func.__name__)
traceback.print_exc()
os._exit(1)
return wrapped_func
但我相信它只会终止子进程而不是整个脚本,有什么建议吗?
我不确定池隐藏错误是什么意思。我的经验是,当辅助函数(即 Pool
方法的目标)引发未捕获的异常时,它不会被忽视。无论如何,...
我建议:
- 您不使用您的
trace_unhandled_exception
装饰器并允许您的工作函数main
引发异常并且
- 而不是使用方法
map_async
(为什么不是 map
?),您使用方法 imap
,它允许您迭代单个 return 值和任何main
可能抛出的异常,因为它们变得可用 因此,一旦您检测到异常,您就可以调用 multiprocessing.Pool.terminate()
来 (1) 取消任何已提交但尚未开始的任务或 (2) 运行ning 任务尚未完成。顺便说一句,即使您不调用terminate
,一旦提交的任务中出现未捕获的异常,处理池也会刷新输入任务队列。
一旦主进程检测到异常,它就可以。当然是清理池子后调用sys.exit()
import multiprocessing as mp
start_block = 100
end_block = 50000
def main(block_num):
if block_num == 1000:
raise ValueError("I don't like 1000.")
return block_num * block_num
if __name__ == "__main__":
cpus = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(cpus)
it = pool.imap(main, range(start_block - 20, end_block), chunksize=cpus)
results = []
while True:
try:
result = next(it)
except StopIteration:
break
except Exception as e:
print(e)
# Kill remaining tasks
pool.terminate()
break
else:
results.append(result)
pool.close()
pool.join()
打印:
I don't like 1000.
或者,您可以保留装饰器函数,但将其修改为 return 它捕获的 Exception
实例(目前,它隐式 returns None
)。那么你可以修改while True
循环如下:
while True:
try:
result = next(it)
except StopIteration:
break
else:
if isinstance(result, Exception):
pool.terminate()
break
results.append(result)
由于没有引发实际的异常,如果您想继续执行而不允许剩余的已提交任务 运行,那么对 terminate
的调用就变得绝对必要。即使您只想立即退出,终止并清理池仍然是一个好主意,以确保在您调用退出时没有任何挂起。
我认为你不需要 trace_unhandled_exception
装饰器来做你想做的事,至少如果你使用 pool.apply_async()
而不是 pool.map_async()
则不需要,因为你可以使用 error_callback=
选项它支持在目标函数失败时得到通知。请注意,map_async()
也支持类似的东西,但在 entire 可迭代对象被消耗之前它不会被调用——所以它不适合你想做的事情。
我从@Tim Peters 那里得到了这种方法的想法 to a similar question titled
import multiprocessing as mp
import random
import time
START_BLOCK = 100
END_BLOCK = 1000
def blah_blah(block_num):
if block_num % 10 == 0:
print(f'Processing block {block_num}')
time.sleep(random.uniform(.01, .1))
return block_num
def main(block_num):
if random.randint(0, 100) == 42:
print(f'Raising radom exception')
raise RuntimeError('RANDOM TEST EXCEPTION')
block = blah_blah(block_num)
return block
def error_handler(exception):
print(f'{exception} occurred, terminating pool.')
pool.terminate()
if __name__ == "__main__":
processes = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(processes)
for i in range(START_BLOCK-20, END_BLOCK):
pool.apply_async(main, (i,), error_callback=error_handler)
pool.close()
pool.join()
print('-fini-')
我有一个脚本,它本质上是一个 API scraper,它会永久运行。我给它绑了一个 map_async 池,它很漂亮,池隐藏了一些错误,我了解到这些错误很常见。所以我合并了这个包装的辅助函数。
helper.py
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
print('Exception in '+func.__name__)
traceback.print_exc()
return wrapped_func
我的主脚本看起来像
scraper.py
import multiprocessing as mp
from helper import trace_unhandled_exceptions
start_block = 100
end_block = 50000
@trace_unhandled_exceptions
def main(block_num):
block = blah_blah(block_num)
return block
if __name__ == "__main__":
cpus = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(cpus)
pool.map_async(main, range(start_block - 20, end_block), chunksize=cpus)
pool.close()
pool.join()
效果很好,我收到异常:
Exception in main
Traceback (most recent call last):
.....
如何让脚本在出现异常时结束,我试过像这样将 os.exit 或 sys.exit 合并到辅助函数中
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
print('Exception in '+func.__name__)
traceback.print_exc()
os._exit(1)
return wrapped_func
但我相信它只会终止子进程而不是整个脚本,有什么建议吗?
我不确定池隐藏错误是什么意思。我的经验是,当辅助函数(即 Pool
方法的目标)引发未捕获的异常时,它不会被忽视。无论如何,...
我建议:
- 您不使用您的
trace_unhandled_exception
装饰器并允许您的工作函数main
引发异常并且 - 而不是使用方法
map_async
(为什么不是map
?),您使用方法imap
,它允许您迭代单个 return 值和任何main
可能抛出的异常,因为它们变得可用 因此,一旦您检测到异常,您就可以调用multiprocessing.Pool.terminate()
来 (1) 取消任何已提交但尚未开始的任务或 (2) 运行ning 任务尚未完成。顺便说一句,即使您不调用terminate
,一旦提交的任务中出现未捕获的异常,处理池也会刷新输入任务队列。
一旦主进程检测到异常,它就可以。当然是清理池子后调用sys.exit()
import multiprocessing as mp
start_block = 100
end_block = 50000
def main(block_num):
if block_num == 1000:
raise ValueError("I don't like 1000.")
return block_num * block_num
if __name__ == "__main__":
cpus = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(cpus)
it = pool.imap(main, range(start_block - 20, end_block), chunksize=cpus)
results = []
while True:
try:
result = next(it)
except StopIteration:
break
except Exception as e:
print(e)
# Kill remaining tasks
pool.terminate()
break
else:
results.append(result)
pool.close()
pool.join()
打印:
I don't like 1000.
或者,您可以保留装饰器函数,但将其修改为 return 它捕获的 Exception
实例(目前,它隐式 returns None
)。那么你可以修改while True
循环如下:
while True:
try:
result = next(it)
except StopIteration:
break
else:
if isinstance(result, Exception):
pool.terminate()
break
results.append(result)
由于没有引发实际的异常,如果您想继续执行而不允许剩余的已提交任务 运行,那么对 terminate
的调用就变得绝对必要。即使您只想立即退出,终止并清理池仍然是一个好主意,以确保在您调用退出时没有任何挂起。
我认为你不需要 trace_unhandled_exception
装饰器来做你想做的事,至少如果你使用 pool.apply_async()
而不是 pool.map_async()
则不需要,因为你可以使用 error_callback=
选项它支持在目标函数失败时得到通知。请注意,map_async()
也支持类似的东西,但在 entire 可迭代对象被消耗之前它不会被调用——所以它不适合你想做的事情。
我从@Tim Peters 那里得到了这种方法的想法
import multiprocessing as mp
import random
import time
START_BLOCK = 100
END_BLOCK = 1000
def blah_blah(block_num):
if block_num % 10 == 0:
print(f'Processing block {block_num}')
time.sleep(random.uniform(.01, .1))
return block_num
def main(block_num):
if random.randint(0, 100) == 42:
print(f'Raising radom exception')
raise RuntimeError('RANDOM TEST EXCEPTION')
block = blah_blah(block_num)
return block
def error_handler(exception):
print(f'{exception} occurred, terminating pool.')
pool.terminate()
if __name__ == "__main__":
processes = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(processes)
for i in range(START_BLOCK-20, END_BLOCK):
pool.apply_async(main, (i,), error_callback=error_handler)
pool.close()
pool.join()
print('-fini-')