python 多处理池中的异常处理
Excepion handling in python multiprocessing pool
我试图在我的程序中处理 KeyboardInterrupt 异常,但我找不到如何使用多处理池来处理。即使我将池操作放在 try-exception 块中并处理异常,我还是收到 4 个 KeyboardInterrupt 异常。
import time
import multiprocessing as mp
def calc(i):
return i*i
def main():
try:
with mp.Pool(4) as p:
while True:
print(p.map(calc, range(10)))
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down.")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
我知道这些进程 运行 在一个孤立的环境中运行,但我也想以某种方式处理异常。
编辑:当我 运行 我的代码时得到的输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
^CProcess ForkPoolWorker-3:
Process ForkPoolWorker-5:
Process ForkPoolWorker-4:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
KeyboardInterrupt
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 335, in get
res = self._reader.recv_bytes()
File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Process ForkPoolWorker-6:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Shutting down.
由于您似乎是 运行 在 Linux-type 平台下(您真的应该用平台标记您的多处理问题),您需要忽略池中的 CTRL-C过程。最简单的方法是在创建池时使用 initializer 参数:
import time
import multiprocessing as mp
def init_pool_processes():
"""
Each pool process will execute this as part of its
initialization.
"""
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
def calc(i):
return i*i
def main():
try:
with mp.Pool(4, initializer=init_pool_processes) as p:
while True:
print(p.map(calc, range(10)))
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down.")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
更新
要处理辅助函数抛出的异常,您应该使用方法 imap
,其中 return 是一个迭代器,当迭代时 return 是下一个 return 值或如果相应的任务引发异常,则引发异常。通过这种方式,您可以捕获已提交的单个任务的异常。例如:
import multiprocessing as mp
def calc(i):
if i == 3:
raise ValueError(f'bad i value {i}')
return i*i
def main():
return_values = []
with mp.Pool(4) as p:
results = p.imap(calc, range(10))
while True:
try:
return_value = next(results)
return_values.append(return_value)
except StopIteration:
# No more results:
break
except Exception as e:
# worker function raised an exception
print('Got exception:', e)
# Let's also append the exception as the return value:
return_values.append(e)
print(return_values)
if __name__ == '__main__':
main()
打印:
Got exception: bad i value 3
[0, 1, 4, ValueError('bad i value 3'), 16, 25, 36, 49, 64, 81]
我试图在我的程序中处理 KeyboardInterrupt 异常,但我找不到如何使用多处理池来处理。即使我将池操作放在 try-exception 块中并处理异常,我还是收到 4 个 KeyboardInterrupt 异常。
import time
import multiprocessing as mp
def calc(i):
return i*i
def main():
try:
with mp.Pool(4) as p:
while True:
print(p.map(calc, range(10)))
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down.")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
我知道这些进程 运行 在一个孤立的环境中运行,但我也想以某种方式处理异常。
编辑:当我 运行 我的代码时得到的输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
^CProcess ForkPoolWorker-3:
Process ForkPoolWorker-5:
Process ForkPoolWorker-4:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
KeyboardInterrupt
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 335, in get
res = self._reader.recv_bytes()
File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Process ForkPoolWorker-6:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
with self._rlock:
File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Shutting down.
由于您似乎是 运行 在 Linux-type 平台下(您真的应该用平台标记您的多处理问题),您需要忽略池中的 CTRL-C过程。最简单的方法是在创建池时使用 initializer 参数:
import time
import multiprocessing as mp
def init_pool_processes():
"""
Each pool process will execute this as part of its
initialization.
"""
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
def calc(i):
return i*i
def main():
try:
with mp.Pool(4, initializer=init_pool_processes) as p:
while True:
print(p.map(calc, range(10)))
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down.")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
更新
要处理辅助函数抛出的异常,您应该使用方法 imap
,其中 return 是一个迭代器,当迭代时 return 是下一个 return 值或如果相应的任务引发异常,则引发异常。通过这种方式,您可以捕获已提交的单个任务的异常。例如:
import multiprocessing as mp
def calc(i):
if i == 3:
raise ValueError(f'bad i value {i}')
return i*i
def main():
return_values = []
with mp.Pool(4) as p:
results = p.imap(calc, range(10))
while True:
try:
return_value = next(results)
return_values.append(return_value)
except StopIteration:
# No more results:
break
except Exception as e:
# worker function raised an exception
print('Got exception:', e)
# Let's also append the exception as the return value:
return_values.append(e)
print(return_values)
if __name__ == '__main__':
main()
打印:
Got exception: bad i value 3
[0, 1, 4, ValueError('bad i value 3'), 16, 25, 36, 49, 64, 81]