如何在不处于顶层的情况下解决 python 多处理的酸洗错误?
How to get around the pickling error of python multiprocessing without being in the top-level?
我已经多次研究过这个问题,但还没有找到适合我的情况或我理解的解决方法,所以请耐心等待。
基本上,我有一个层次结构的功能组织,这阻止了我在顶层进行多处理。不幸的是,我不相信我可以改变程序的布局——因为我需要在初始输入后创建的所有变量。
例如,假设我有这个:
import multiprocessing
def calculate(x):
# here is where I would take this input x (and maybe a couple more inputs)
# and build a larger library of variables that I use further down the line
def domath(y):
return x * y
pool = multiprocessing.Pool(3)
final= pool.map(domath, range(3))
calculate(2)
这会产生以下错误:
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我在考虑全局变量,但我担心我必须定义太多,这可能会大大降低我的程序速度。
有没有无需重构整个程序的解决方法?
您遇到的问题其实是一个功能。 pickle 源实际上是为了防止这种行为而设计的,以防止恶意代码被执行。在处理任何适用的安全实施时请考虑这一点。
首先我们有一些进口。
import marshal
import pickle
import types
这里我们有一个函数,它接受一个函数作为参数,pickle 对象的各个部分,然后 returns 一个包含所有部分的元组:
def pack(fn):
code = marshal.dumps(fn.__code__)
name = pickle.dumps(fn.__name__)
defs = pickle.dumps(fn.__defaults__)
clos = pickle.dumps(fn.__closure__)
return (code, name, defs, clos)
接下来我们有一个函数,它采用我们转换函数的四个部分。它翻译这四个部分,然后 returns 从这些部分创建一个函数。您应该注意这里重新引入了全局变量,因为我们的流程不处理这些变量:
def unpack(code, name, defs, clos):
code = marshal.loads(code)
glob = globals()
name = pickle.loads(name)
defs = pickle.loads(defs)
clos = pickle.loads(clos)
return types.FunctionType(code, glob, name, defs, clos)
这里我们有一个测试函数。请注意,我在函数范围内放置了一个导入。我们的酸洗过程不处理全局变量:
def test_function(a, b):
from random import randint
return randint(a, b)
最后我们打包测试对象并打印结果以确保一切正常:
packed = pack(test_function)
print((packed))
最后,我们解包我们的函数,将它赋值给一个变量,调用它,并打印它的输出:
unpacked = unpack(*packed)
print((unpacked(2, 20)))
如有任何问题,请发表评论。
您可以使用 pathos.multiprocessing
,它是 multiprocessing
的一个分支,它使用 dill
序列化器而不是 pickle
。 dill
几乎可以序列化 python 中的任何内容。然后,无需编辑您的代码。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> def calculate(x):
... def domath(y):
... return x*y
... return Pool().map(domath, range(3))
...
>>> calculate(2)
[0, 2, 4]
你甚至可以疯狂地使用它……因为大多数东西都是腌制的。无需使用纯 multiprocessing
.
烹制的奇数非 pythonic 解决方案
>>> class Foo(object):
... def __init__(self, x):
... self.x = x
... def doit(self, y):
... return ProcessingPool().map(self.squared, calculate(y+self.x))
... def squared(self, z):
... return z*z
...
>>> def thing(obj, y):
... return getattr(obj, 'doit')(y)
...
>>> ProcessingPool().map(thing, ProcessingPool().map(Foo, range(3)), range(3))
[[0, 0, 0], [0, 4, 16], [0, 16, 64]]
在此处获取 pathos
:https://github.com/uqfoundation
把内嵌函数去掉怎么样?
这在我看来是最清晰的解决方案(因为你没有给出预期的输出,我不得不猜测):
$ cat /tmp/tmp.py
import multiprocessing
def calculate(x):
# here is where I would take this input x (and maybe a couple more inputs)
# and build a larger library of variables that I use further down the line
pool = multiprocessing.Pool(3)
_lst = [(x, y) for x in (x,) for y in range(3)]
final= pool.map(domath, _lst)
print(final)
def domath(l):
return l[0] * l[1]
calculate(2)
$ python /tmp/tmp.py
[0, 2, 4]
$
我已经多次研究过这个问题,但还没有找到适合我的情况或我理解的解决方法,所以请耐心等待。
基本上,我有一个层次结构的功能组织,这阻止了我在顶层进行多处理。不幸的是,我不相信我可以改变程序的布局——因为我需要在初始输入后创建的所有变量。
例如,假设我有这个:
import multiprocessing
def calculate(x):
# here is where I would take this input x (and maybe a couple more inputs)
# and build a larger library of variables that I use further down the line
def domath(y):
return x * y
pool = multiprocessing.Pool(3)
final= pool.map(domath, range(3))
calculate(2)
这会产生以下错误:
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我在考虑全局变量,但我担心我必须定义太多,这可能会大大降低我的程序速度。 有没有无需重构整个程序的解决方法?
您遇到的问题其实是一个功能。 pickle 源实际上是为了防止这种行为而设计的,以防止恶意代码被执行。在处理任何适用的安全实施时请考虑这一点。
首先我们有一些进口。
import marshal
import pickle
import types
这里我们有一个函数,它接受一个函数作为参数,pickle 对象的各个部分,然后 returns 一个包含所有部分的元组:
def pack(fn):
code = marshal.dumps(fn.__code__)
name = pickle.dumps(fn.__name__)
defs = pickle.dumps(fn.__defaults__)
clos = pickle.dumps(fn.__closure__)
return (code, name, defs, clos)
接下来我们有一个函数,它采用我们转换函数的四个部分。它翻译这四个部分,然后 returns 从这些部分创建一个函数。您应该注意这里重新引入了全局变量,因为我们的流程不处理这些变量:
def unpack(code, name, defs, clos):
code = marshal.loads(code)
glob = globals()
name = pickle.loads(name)
defs = pickle.loads(defs)
clos = pickle.loads(clos)
return types.FunctionType(code, glob, name, defs, clos)
这里我们有一个测试函数。请注意,我在函数范围内放置了一个导入。我们的酸洗过程不处理全局变量:
def test_function(a, b):
from random import randint
return randint(a, b)
最后我们打包测试对象并打印结果以确保一切正常:
packed = pack(test_function)
print((packed))
最后,我们解包我们的函数,将它赋值给一个变量,调用它,并打印它的输出:
unpacked = unpack(*packed)
print((unpacked(2, 20)))
如有任何问题,请发表评论。
您可以使用 pathos.multiprocessing
,它是 multiprocessing
的一个分支,它使用 dill
序列化器而不是 pickle
。 dill
几乎可以序列化 python 中的任何内容。然后,无需编辑您的代码。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> def calculate(x):
... def domath(y):
... return x*y
... return Pool().map(domath, range(3))
...
>>> calculate(2)
[0, 2, 4]
你甚至可以疯狂地使用它……因为大多数东西都是腌制的。无需使用纯 multiprocessing
.
>>> class Foo(object):
... def __init__(self, x):
... self.x = x
... def doit(self, y):
... return ProcessingPool().map(self.squared, calculate(y+self.x))
... def squared(self, z):
... return z*z
...
>>> def thing(obj, y):
... return getattr(obj, 'doit')(y)
...
>>> ProcessingPool().map(thing, ProcessingPool().map(Foo, range(3)), range(3))
[[0, 0, 0], [0, 4, 16], [0, 16, 64]]
在此处获取 pathos
:https://github.com/uqfoundation
把内嵌函数去掉怎么样?
这在我看来是最清晰的解决方案(因为你没有给出预期的输出,我不得不猜测):
$ cat /tmp/tmp.py
import multiprocessing
def calculate(x):
# here is where I would take this input x (and maybe a couple more inputs)
# and build a larger library of variables that I use further down the line
pool = multiprocessing.Pool(3)
_lst = [(x, y) for x in (x,) for y in range(3)]
final= pool.map(domath, _lst)
print(final)
def domath(l):
return l[0] * l[1]
calculate(2)
$ python /tmp/tmp.py
[0, 2, 4]
$