error_callback 在 multiprocessing.Pool apply_async 在 Python 2?

error_callback in multiprocessing.Pool apply_async in Python 2?

multiprocessing.Pool class 的函数 apply_asyncPython 3 中有参数 error_callback。但是Python2中缺少这个论点。

在 Python 2 中是否有任何技巧可以实现相同的功能?理想情况下,我想编写在 Python 2 和 3 中运行的代码。

我还没有尝试过 python3。但对我来说,为了捕获子进程中的错误,我将在子进程中运行的函数放在

import traceback


try:
    your code that can make error
except Exception as e:
    print e
    return False, traceback.format_exc()
else:
    return True, result

这样我就知道是否出了问题。

编辑:我将 return 格式更改为 OP 的注释,以便子进程 return 是一个元组 (is_success, result or error traceback message )

因此主进程将首先读取标志 is_success 然后相应地处理第二个参数。

TL;DR

不要向 return 添加额外的参数,而是使用 successful() method that AnycResult objects 为您提供的。这就是它的用途。

说明

调用 apply_async return AsyncResult 对象。这些对象有一个名为 successful() 的方法,它会告诉您目标进程是否通过异常退出。调用 successful() 将 return true 或 false 取决于子进程是否异常退出。这是一个例子:

import multiprocessing

def good_func(arg):
    arg += 1
    return arg

def bad_func(arg):
    arg += 1
    raise Exception # force an exception to be raised
    return arg

pool = multiprocessing.Pool()

good_result = pool.apply_async(good_func, (1,))
bad_result = pool.apply_async(bad_func, (1,))

pool.close()
pool.join()

print(good_result.successful()) # prints True
print(bad_result.successful())  # prints False

此代码returns:

True
False

然后您可以添加一个条件表达式,如果对 successfull() return 的调用为假,则调用您的清理或错误处理代码。

此外,如果您需要回溯,您可以将 subprocess 函数的主体包装在 try/except 块中,并 return 一个字符串版本的回溯,以防出现异常。它可能看起来像这个版本的 bad_func:

import sys
import traceback
import multiprocessing

def good_func(arg):
    arg += 1
    return arg

def bad_func(arg):
    try:
        arg += 1
        raise Exception
        return a
    except Exception as error:
        # capture the exception and bundle the traceback
        # in a string, then raise new exception with the string traceback
        raise Exception("".join(traceback.format_exception(*sys.exc_info())))

pool = multiprocessing.Pool()

good_result = pool.apply_async(good_func, (1,))
bad_result = pool.apply_async(bad_func, (1,))

pool.close()
pool.join()

print(good_result.successful()) # prints True
print(good_result.get())        # prints good result
print(bad_result.successful())  # prints False
print(bad_result.get())         # prints traceback

此代码产生此输出:

True
2
False
Traceback (most recent call last):
  File "async_err.py", line 29, in
    print(bad_result.get())
  File "/user/peteoss/encap/Python-2.7.6/lib/python2.7/multiprocessing /pool.py", line 554, in get
    raise self._value
Exception: Traceback (most recent call last):
  File "async_err.py", line 13, in bad_func
    raise Exception
Exception

您应该注意以下几点:

  1. 您将获得两个回溯:一个来自对 get() 的失败调用,第二个是您在子进程中捆绑到字符串中的回溯。

  2. 如果将subprocess函数包裹在try/except块中,处理时需要重新抛出异常,否则get()会return false .

  3. AsyncResult.successful()AsyncResult.get() 方法在所有具有多处理库的 CPython 版本中都可用,因此此方法满足您编写与版本无关的代码的要求。