调试 python 多处理中的错误
debugging errors in python multiprocessing
我正在使用 multiprocessing
模块的 Pool
函数,以便 运行 对不同数据并行执行相同的代码。
事实证明,我的代码在某些数据上引发了异常,但未给出发生这种情况的确切行:
Traceback (most recent call last):
File "my_wrapper_script.py", line 366, in <module>
main()
File "my_wrapper_script.py", line 343, in main
results = pool.map(process_function, folders)
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 148, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
KeyError: 'some_key'
我知道 multiprocessing.log_to_stderr()
,但它似乎在出现并发问题时很有用,但我不是这样。
有什么想法吗?
如果您使用足够新的 Python 版本,您实际上会看到真正的异常先于该版本打印。例如,这是一个失败的示例:
import multiprocessing
def inner():
raise Exception("FAIL")
def f():
print("HI")
inner()
p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()
这是 运行 使用 python 3.4 时的例外情况:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "test.py", line 9, in f
inner()
File "test.py", line 4, in inner
raise Exception("FAIL")
Exception: FAIL
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "test.py", line 13, in <module>
p.apply(f)
File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 253, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
Exception: FAIL
如果不能选择使用较新的版本,最简单的方法是将辅助函数包装在一个 try/except 块中,该块将在重新引发异常之前打印异常:
import multiprocessing
import traceback
def inner():
raise Exception("FAIL")
def f():
try:
print("HI")
inner()
except Exception:
print("Exception in worker:")
traceback.print_exc()
raise
p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()
输出:
HI
Exception in worker:
Traceback (most recent call last):
File "test.py", line 11, in f
inner()
File "test.py", line 5, in inner
raise Exception("FAIL")
Exception: FAIL
Traceback (most recent call last):
File "test.py", line 18, in <module>
p.apply(f)
File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
Exception: FAIL
您需要在工作器中实现您自己的 try/except 块。根据您想要组织代码的方式,您可以像上面提到的那样登录到 stderr,登录到文件等其他地方,return 某种错误代码,甚至用当前回溯标记异常并重新-增加。这是最后一种技术的示例:
import traceback
import multiprocessing as mp
class MyError(Exception):
pass
def worker():
try:
# your real code here
raise MyError("boom")
except Exception, e:
e.traceback = traceback.format_exc()
raise
def main():
pool = mp.Pool()
try:
print "run worker"
result = pool.apply_async(worker)
result.get()
# handle exceptions you expect
except MyError, e:
print e.traceback
# re-raise the rest
except Exception, e:
print e.traceback
raise
if __name__=="__main__":
main()
它returns
run worker
Traceback (most recent call last):
File "doit.py", line 10, in worker
raise MyError("boom")
MyError: boom
我正在使用 multiprocessing
模块的 Pool
函数,以便 运行 对不同数据并行执行相同的代码。
事实证明,我的代码在某些数据上引发了异常,但未给出发生这种情况的确切行:
Traceback (most recent call last):
File "my_wrapper_script.py", line 366, in <module>
main()
File "my_wrapper_script.py", line 343, in main
results = pool.map(process_function, folders)
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 148, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
KeyError: 'some_key'
我知道 multiprocessing.log_to_stderr()
,但它似乎在出现并发问题时很有用,但我不是这样。
有什么想法吗?
如果您使用足够新的 Python 版本,您实际上会看到真正的异常先于该版本打印。例如,这是一个失败的示例:
import multiprocessing
def inner():
raise Exception("FAIL")
def f():
print("HI")
inner()
p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()
这是 运行 使用 python 3.4 时的例外情况:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "test.py", line 9, in f
inner()
File "test.py", line 4, in inner
raise Exception("FAIL")
Exception: FAIL
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "test.py", line 13, in <module>
p.apply(f)
File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 253, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
Exception: FAIL
如果不能选择使用较新的版本,最简单的方法是将辅助函数包装在一个 try/except 块中,该块将在重新引发异常之前打印异常:
import multiprocessing
import traceback
def inner():
raise Exception("FAIL")
def f():
try:
print("HI")
inner()
except Exception:
print("Exception in worker:")
traceback.print_exc()
raise
p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()
输出:
HI
Exception in worker:
Traceback (most recent call last):
File "test.py", line 11, in f
inner()
File "test.py", line 5, in inner
raise Exception("FAIL")
Exception: FAIL
Traceback (most recent call last):
File "test.py", line 18, in <module>
p.apply(f)
File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
Exception: FAIL
您需要在工作器中实现您自己的 try/except 块。根据您想要组织代码的方式,您可以像上面提到的那样登录到 stderr,登录到文件等其他地方,return 某种错误代码,甚至用当前回溯标记异常并重新-增加。这是最后一种技术的示例:
import traceback
import multiprocessing as mp
class MyError(Exception):
pass
def worker():
try:
# your real code here
raise MyError("boom")
except Exception, e:
e.traceback = traceback.format_exc()
raise
def main():
pool = mp.Pool()
try:
print "run worker"
result = pool.apply_async(worker)
result.get()
# handle exceptions you expect
except MyError, e:
print e.traceback
# re-raise the rest
except Exception, e:
print e.traceback
raise
if __name__=="__main__":
main()
它returns
run worker
Traceback (most recent call last):
File "doit.py", line 10, in worker
raise MyError("boom")
MyError: boom