调试 python 多处理中的错误

debugging errors in python multiprocessing

我正在使用 multiprocessing 模块的 Pool 函数,以便 运行 对不同数据并行执行相同的代码。

事实证明,我的代码在某些数据上引发了异常,但未给出发生这种情况的确切行:

Traceback (most recent call last):
  File "my_wrapper_script.py", line 366, in <module>
    main()
  File "my_wrapper_script.py", line 343, in main
    results = pool.map(process_function, folders)
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 148, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
KeyError: 'some_key'

我知道 multiprocessing.log_to_stderr() ,但它似乎在出现并发问题时很有用,但我不是这样。

有什么想法吗?

如果您使用足够新的 Python 版本,您实际上会看到真正的异常先于该版本打印。例如,这是一个失败的示例:

import multiprocessing

def inner():
    raise Exception("FAIL")

def f():
    print("HI")
    inner()

p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()

这是 运行 使用 python 3.4 时的例外情况:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "test.py", line 9, in f
    inner()
  File "test.py", line 4, in inner
    raise Exception("FAIL")
Exception: FAIL
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test.py", line 13, in <module>
    p.apply(f)
  File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 253, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
Exception: FAIL

如果不能选择使用较新的版本,最简单的方法是将辅助函数包装在一个 try/except 块中,该块将在重新引发异常之前打印异常:

import multiprocessing
import traceback

def inner():
    raise Exception("FAIL")

def f():
    try:
        print("HI")
        inner()
    except Exception:
        print("Exception in worker:")
        traceback.print_exc()
        raise

p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()

输出:

HI
Exception in worker:
Traceback (most recent call last):
  File "test.py", line 11, in f
    inner()
  File "test.py", line 5, in inner
    raise Exception("FAIL")
Exception: FAIL
Traceback (most recent call last):
  File "test.py", line 18, in <module>
    p.apply(f)
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
Exception: FAIL

您需要在工作器中实现您自己的 try/except 块。根据您想要组织代码的方式,您可以像上面提到的那样登录到 stderr,登录到文件等其他地方,return 某种错误代码,甚至用当前回溯标记异常并重新-增加。这是最后一种技术的示例:

import traceback
import multiprocessing as mp

class MyError(Exception):
    pass

def worker():
    try:
        # your real code here
        raise MyError("boom")
    except Exception, e:
        e.traceback = traceback.format_exc()
        raise

def main():
    pool = mp.Pool()
    try:
        print "run worker"
        result = pool.apply_async(worker)
        result.get()
    # handle exceptions you expect
    except MyError, e:
        print e.traceback
    # re-raise the rest
    except Exception, e:
        print e.traceback
        raise


if __name__=="__main__":
    main()

它returns

run worker
Traceback (most recent call last):
  File "doit.py", line 10, in worker
    raise MyError("boom")
MyError: boom