如何在不处于顶层的情况下解决 python 多处理的酸洗错误?

How to get around the pickling error of python multiprocessing without being in the top-level?

我已经多次研究过这个问题,但还没有找到适合我的情况或我理解的解决方法,所以请耐心等待。

基本上,我有一个层次结构的功能组织,这阻止了我在顶层进行多处理。不幸的是,我不相信我可以改变程序的布局——因为我需要在初始输入后创建的所有变量。

例如,假设我有这个:

import multiprocessing

  def calculate(x):
    # here is where I would take this input x (and maybe a couple more inputs)
    # and build a larger library of variables that I use further down the line

    def domath(y):
      return x * y

    pool = multiprocessing.Pool(3)
    final= pool.map(domath, range(3))

calculate(2)

这会产生以下错误:

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我在考虑全局变量,但我担心我必须定义太多,这可能会大大降低我的程序速度。 有没有无需重构整个程序的解决方法?

您遇到的问题其实是一个功能。 pickle 源实际上是为了防止这种行为而设计的,以防止恶意代码被执行。在处理任何适用的安全实施时请考虑这一点。

首先我们有一些进口。

import marshal
import pickle
import types

这里我们有一个函数,它接受一个函数作为参数,pickle 对象的各个部分,然后 returns 一个包含所有部分的元组:

def pack(fn):
    code = marshal.dumps(fn.__code__)
    name = pickle.dumps(fn.__name__)
    defs = pickle.dumps(fn.__defaults__)
    clos = pickle.dumps(fn.__closure__)
    return (code, name, defs, clos)

接下来我们有一个函数,它采用我们转换函数的四个部分。它翻译这四个部分,然后 returns 从这些部分创建一个函数。您应该注意这里重新引入了全局变量,因为我们的流程不处理这些变量:

def unpack(code, name, defs, clos):
    code = marshal.loads(code)
    glob = globals()
    name = pickle.loads(name)
    defs = pickle.loads(defs)
    clos = pickle.loads(clos)
    return types.FunctionType(code, glob, name, defs, clos)

这里我们有一个测试函数。请注意,我在函数范围内放置了一个导入。我们的酸洗过程不处理全局变量:

def test_function(a, b):
    from random import randint
    return randint(a, b)

最后我们打包测试对象并打印结果以确保一切正常:

packed = pack(test_function)
print((packed))

最后,我们解包我们的函数,将它赋值给一个变量,调用它,并打印它的输出:

unpacked = unpack(*packed)
print((unpacked(2, 20)))

如有任何问题,请发表评论。

您可以使用 pathos.multiprocessing,它是 multiprocessing 的一个分支,它使用 dill 序列化器而不是 pickledill 几乎可以序列化 python 中的任何内容。然后,无需编辑您的代码。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> def calculate(x):
...   def domath(y):
...     return x*y
...   return Pool().map(domath, range(3))
... 
>>> calculate(2)
[0, 2, 4]

你甚至可以疯狂地使用它……因为大多数东西都是腌制的。无需使用纯 multiprocessing.

烹制的奇数非 pythonic 解决方案
>>> class Foo(object):
...   def __init__(self, x):
...     self.x = x
...   def doit(self, y):
...     return ProcessingPool().map(self.squared, calculate(y+self.x))
...   def squared(self, z):
...     return z*z
... 
>>> def thing(obj, y):
...   return getattr(obj, 'doit')(y)
... 
>>> ProcessingPool().map(thing, ProcessingPool().map(Foo, range(3)), range(3))
[[0, 0, 0], [0, 4, 16], [0, 16, 64]]

在此处获取 pathoshttps://github.com/uqfoundation

把内嵌函数去掉怎么样?

这在我看来是最清晰的解决方案(因为你没有给出预期的输出,我不得不猜测):

$ cat /tmp/tmp.py
import multiprocessing

def calculate(x):
    # here is where I would take this input x (and maybe a couple more inputs)
    # and build a larger library of variables that I use further down the line

    pool = multiprocessing.Pool(3)
    _lst = [(x, y) for x in (x,) for y in range(3)]
    final= pool.map(domath, _lst)
    print(final)

def domath(l):
    return l[0] * l[1]

calculate(2)

$ python /tmp/tmp.py
[0, 2, 4]

$