Python namedtuple 作为 apply_async(..) 回调的参数

Python namedtuple as argument to apply_async(..) callback

我正在编写一个简短的程序,我想在其中异步调用一个函数,这样它就不会阻塞调用者。为此,我使用了 python 的 multiprocessing 模块中的 Pool

在异步调用的函数中,我想 return a namedtuple 以符合我程序其余部分的逻辑,但我发现 namedtuple似乎不是从派生进程传递到回调的受支持类型(可能是因为它不能被腌制)。这是问题的最小重现。

from multiprocessing import Pool
from collections import namedtuple

logEntry = namedtuple("LogEntry", ['logLev', 'msg'])

def doSomething(x):
    # Do actual work here
    logCode = 1
    statusStr = "Message Here"
    return logEntry(logLev=logCode, msg=statusStr)

def callbackFunc(result):
    print(result.logLev)
    print(result.msg)

def userAsyncCall():
    pool = Pool()
    pool.apply_async(doSomething, [1,2], callback=callbackFunc)

if __name__ == "__main__":
    userAsyncCall() # Nothing is printed

    # If this is uncommented, the logLev and status are printed as expected:
    # y = logEntry(logLev=2, msg="Hello World")
    # callbackFunc(y)

有谁知道是否有办法将 namedtuple return 值从异步进程传递到回调?我正在做的事情有 better/more pythonic 方法吗?

未打印任何内容的原因是 apply_async 静静地失败了。顺便说一句,我认为这是一种只会让人感到困惑的不良行为。您可以通过 error_callback 来处理错误。

def errorCallback(exception):
    print(exception)

def userAsyncCall():
    pool = Pool()
    pool.apply_async(doSomething, [1], callback=callbackFunc,  error_callback=errorCallback)
    # You passed wrong arguments. doSomething() takes 1 positional argument.
    # I replace [1,2] with [1].

if __name__ == "__main__":
    userAsyncCall()
    import time
    time.sleep(3) # You need this, otherwise you will never see the output.

当你来到这里时,输出是

Error sending result: 'LogEntry(logLev=1, msg='Message Here')'. Reason: 'PicklingError("Can't pickle <class '__mp_main__.LogEntry'>: attribute lookup LogEntry on __mp_main__ failed",)'

酸洗错误! 你说得对,namedtuple 无法从派生进程传递到回调。

也许这不是更容易接受的方式,但您可以发送 dict 作为结果而不是 namedtuple

正如 Dag Høidahl 更正的那样,可以传递 namedtuple。以下行有效。

LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])

问题是 namedtuple() 的 return 值及其 typename 参数的大小写不同。也就是说,命名元组的 class 定义与您给它的变量名不匹配。您需要两者匹配:

LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])

并相应更新doSomething()中的return语句。

完整代码:

from multiprocessing import Pool
from collections import namedtuple

LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])

def doSomething(x):
    # Do actual work here
    logCode = 1
    statusStr = "Message Here"
    return LogEntry(logLev=logCode, msg=statusStr)

def callbackFunc(result):
    print(result.logLev)
    print(result.msg)

def userAsyncCall():
    pool = Pool()
    return pool.apply_async(doSomething, [1], callback=callbackFunc)

if __name__ == "__main__":
    c = userAsyncCall()

    # To see whether there was an exception, you can attempt to get() the AsyncResult object.
    # print c.get()

(要查看 class 定义,请将 verbose=True 添加到 namedtuple()。)