Python namedtuple 作为 apply_async(..) 回调的参数
Python namedtuple as argument to apply_async(..) callback
我正在编写一个简短的程序,我想在其中异步调用一个函数,这样它就不会阻塞调用者。为此,我使用了 python 的 multiprocessing
模块中的 Pool
。
在异步调用的函数中,我想 return a namedtuple
以符合我程序其余部分的逻辑,但我发现 namedtuple
似乎不是从派生进程传递到回调的受支持类型(可能是因为它不能被腌制)。这是问题的最小重现。
from multiprocessing import Pool
from collections import namedtuple
logEntry = namedtuple("LogEntry", ['logLev', 'msg'])
def doSomething(x):
# Do actual work here
logCode = 1
statusStr = "Message Here"
return logEntry(logLev=logCode, msg=statusStr)
def callbackFunc(result):
print(result.logLev)
print(result.msg)
def userAsyncCall():
pool = Pool()
pool.apply_async(doSomething, [1,2], callback=callbackFunc)
if __name__ == "__main__":
userAsyncCall() # Nothing is printed
# If this is uncommented, the logLev and status are printed as expected:
# y = logEntry(logLev=2, msg="Hello World")
# callbackFunc(y)
有谁知道是否有办法将 namedtuple
return 值从异步进程传递到回调?我正在做的事情有 better/more pythonic 方法吗?
未打印任何内容的原因是 apply_async
静静地失败了。顺便说一句,我认为这是一种只会让人感到困惑的不良行为。您可以通过 error_callback
来处理错误。
def errorCallback(exception):
print(exception)
def userAsyncCall():
pool = Pool()
pool.apply_async(doSomething, [1], callback=callbackFunc, error_callback=errorCallback)
# You passed wrong arguments. doSomething() takes 1 positional argument.
# I replace [1,2] with [1].
if __name__ == "__main__":
userAsyncCall()
import time
time.sleep(3) # You need this, otherwise you will never see the output.
当你来到这里时,输出是
Error sending result: 'LogEntry(logLev=1, msg='Message Here')'. Reason: 'PicklingError("Can't pickle <class '__mp_main__.LogEntry'>: attribute lookup LogEntry on __mp_main__ failed",)'
酸洗错误! 你说得对,namedtuple
无法从派生进程传递到回调。
也许这不是更容易接受的方式,但您可以发送 dict
作为结果而不是 namedtuple
。
正如 Dag Høidahl 更正的那样,可以传递 namedtuple。以下行有效。
LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])
问题是 namedtuple()
的 return 值及其 typename
参数的大小写不同。也就是说,命名元组的 class 定义与您给它的变量名不匹配。您需要两者匹配:
LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])
并相应更新doSomething()
中的return
语句。
完整代码:
from multiprocessing import Pool
from collections import namedtuple
LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])
def doSomething(x):
# Do actual work here
logCode = 1
statusStr = "Message Here"
return LogEntry(logLev=logCode, msg=statusStr)
def callbackFunc(result):
print(result.logLev)
print(result.msg)
def userAsyncCall():
pool = Pool()
return pool.apply_async(doSomething, [1], callback=callbackFunc)
if __name__ == "__main__":
c = userAsyncCall()
# To see whether there was an exception, you can attempt to get() the AsyncResult object.
# print c.get()
(要查看 class 定义,请将 verbose=True
添加到 namedtuple()
。)
我正在编写一个简短的程序,我想在其中异步调用一个函数,这样它就不会阻塞调用者。为此,我使用了 python 的 multiprocessing
模块中的 Pool
。
在异步调用的函数中,我想 return a namedtuple
以符合我程序其余部分的逻辑,但我发现 namedtuple
似乎不是从派生进程传递到回调的受支持类型(可能是因为它不能被腌制)。这是问题的最小重现。
from multiprocessing import Pool
from collections import namedtuple
logEntry = namedtuple("LogEntry", ['logLev', 'msg'])
def doSomething(x):
# Do actual work here
logCode = 1
statusStr = "Message Here"
return logEntry(logLev=logCode, msg=statusStr)
def callbackFunc(result):
print(result.logLev)
print(result.msg)
def userAsyncCall():
pool = Pool()
pool.apply_async(doSomething, [1,2], callback=callbackFunc)
if __name__ == "__main__":
userAsyncCall() # Nothing is printed
# If this is uncommented, the logLev and status are printed as expected:
# y = logEntry(logLev=2, msg="Hello World")
# callbackFunc(y)
有谁知道是否有办法将 namedtuple
return 值从异步进程传递到回调?我正在做的事情有 better/more pythonic 方法吗?
未打印任何内容的原因是 apply_async
静静地失败了。顺便说一句,我认为这是一种只会让人感到困惑的不良行为。您可以通过 error_callback
来处理错误。
def errorCallback(exception):
print(exception)
def userAsyncCall():
pool = Pool()
pool.apply_async(doSomething, [1], callback=callbackFunc, error_callback=errorCallback)
# You passed wrong arguments. doSomething() takes 1 positional argument.
# I replace [1,2] with [1].
if __name__ == "__main__":
userAsyncCall()
import time
time.sleep(3) # You need this, otherwise you will never see the output.
当你来到这里时,输出是
Error sending result: 'LogEntry(logLev=1, msg='Message Here')'. Reason: 'PicklingError("Can't pickle <class '__mp_main__.LogEntry'>: attribute lookup LogEntry on __mp_main__ failed",)'
酸洗错误! 你说得对,namedtuple
无法从派生进程传递到回调。
也许这不是更容易接受的方式,但您可以发送 dict
作为结果而不是 namedtuple
。
正如 Dag Høidahl 更正的那样,可以传递 namedtuple。以下行有效。
LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])
问题是 namedtuple()
的 return 值及其 typename
参数的大小写不同。也就是说,命名元组的 class 定义与您给它的变量名不匹配。您需要两者匹配:
LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])
并相应更新doSomething()
中的return
语句。
完整代码:
from multiprocessing import Pool
from collections import namedtuple
LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])
def doSomething(x):
# Do actual work here
logCode = 1
statusStr = "Message Here"
return LogEntry(logLev=logCode, msg=statusStr)
def callbackFunc(result):
print(result.logLev)
print(result.msg)
def userAsyncCall():
pool = Pool()
return pool.apply_async(doSomething, [1], callback=callbackFunc)
if __name__ == "__main__":
c = userAsyncCall()
# To see whether there was an exception, you can attempt to get() the AsyncResult object.
# print c.get()
(要查看 class 定义,请将 verbose=True
添加到 namedtuple()
。)