无法使用 setParseAction() 方法来腌制 Pyparsing 表达式。需要多处理

Can't pickle Pyparsing expression with setParseAction() method. Needed for multiprocessing

我最初的问题是我正在尝试执行以下操作:

def submit_decoder_process(decoder, input_line):
    decoder.process_line(input_line)
    return decoder

self.pool = Pool(processes=num_of_processes)
self.pool.apply_async(submit_decoder_process, [decoder, input_line]).get()

这里描述decoder有点牵扯,但重要的是decoder是一个用调用setParseAction()的PyParsing表达式初始化的对象。这使 multiprocessing 使用的 pickle 失败,这又使上述代码失败。

现在,这是我分离和简化的 pickle/PyParsing 问题。 由于 pickle 失败,以下代码会产生错误消息。

import pickle
from pyparsing import *

def my_pa_func():
    pass

pickle.dumps(Word(nums).setParseAction(my_pa_func))

错误信息:

pickle.PicklingError: Can't pickle <function wrapper at 0x00000000026534A8>: it's not found as pyparsing.wrapper

现在,如果您删除调用 .setParseAction(my_pa_func),它将正常工作:

pickle.dumps(Word(nums))

我该如何解决?多处理使用泡菜,所以我猜我无法避免它。据说使用 dill 的 pathos 包还不够成熟,至少,我在 Windows-64bit 上安装它时遇到了问题。看得我真是摸不着头脑。

https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled

multiprocessing.Pool 使用 Pickle 的协议序列化通过 Pipe 传递给子进程的函数和模块名称(在您的示例中为 setParseAction 和 pyparse)。

子进程,一旦收到它们,它就会导入模块并尝试调用函数。问题是你传递的不是函数而是方法。为了解决这个问题,Pickle 协议应该足够聪明,用 'user' 参数构建 'Word' 对象,然后调用 setParseAction 方法。由于处理这些情况太复杂,Pickle 协议阻止您序列化非顶级函数。

要解决您的问题,您可以指导 Pickle 模块如何序列化 setParseAction 方法 (https://docs.python.org/2/library/pickle.html#pickle-protocol),或者重构您的代码,使传递给 Pool.apply_async 的内容是可序列化的。

如果将 Word 对象传递给子进程并让它调用 Word().setParseAction() 会怎么样?

好的,这是受 rocksportrocker 启发的解决方案:Python multiprocessing pickling error

想法是在进程之间来回传递时将无法 pickle 的对象进行 dill,然后在传递之后 "undill" 它:

from multiprocessing import Pool
import dill

def submit_decoder_process(decoder_dill, input_line):
    decoder = dill.loads(decoder_dill)  # undill after it was passed to a pool process
    decoder.process_line(input_line)
    return dill.dumps(decoder)  # dill before passing back to parent process

self.pool = Pool(processes=num_of_processes)

# Dill before sending to a pool process
decoder_processed = dill.loads(self.pool.apply_async(submit_decoder_process, [dill.dumps(decoder), input_line]).get())

如您所述,我建议 pathos.multiprocessing。当然,我是 pathos 的作者,所以我想这并不奇怪。看起来可能存在 distutils 您 运行 遇到的错误,如此处所述:https://github.com/uqfoundation/pathos/issues/49.

您使用 dill 的解决方案是一个很好的解决方法。您也可以放弃安装整个 pathos 包,而只安装 multiprocessing 包的 pathos 分支(它使用 dill 而不是 pickle ).您可以在这里找到它:http://dev.danse.us/packages or here: https://github.com/uqfoundation/pathos/tree/master/external,