Python class 方法的多处理错误
Python multiprocessing error with class methods
我正在编写一个程序,其中包含面向对象的代码,我正在尝试进行多处理。我收到泡菜错误,因为默认情况下 python 可以序列化函数但不能序列化 class 方法。所以我在 Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() 上使用了建议,但问题是如果我的方法中有一些 lambda 表达式,它就不起作用。
我的示例代码如下:
import numpy as np
from copy_reg import pickle
from types import MethodType
from multiprocessing.pool import ApplyResult
from _functools import partial
from _collections import defaultdict
class test(object):
def __init__(self,words):
self.words=words
# self.testLambda = defaultdict(lambda : 1.)
def parallel_function(self,f):
def easy_parallize(f,sequence):
from multiprocessing import Pool
pool = Pool(processes=50) # depends on available cores
result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
cleaned = [x for x in result if not x is None] # getting results
cleaned = np.asarray(cleaned)
pool.close() # not optimal! but easy
pool.join()
return cleaned
from functools import partial
return partial(easy_parallize, f)
def dummy(self):
self.t=defaultdict(lambda:1.)
def test(self,a,b,x):
print x
print a
return x*x
def testit(self):
sequence=[1,2,3,4,5]
f1=partial(self.test,'a','b')
f_p=self.parallel_function(f1)
results=f_p(sequence)
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
if __name__ == "__main__":
pickle(MethodType, _pickle_method, _unpickle_method)
t=test('fdfs')
t.dummy()
t.testit()
但由于 lambda 表达式,我得到以下错误:
Traceback (most recent call last):
File "/home/ngoyal/work/nlp_source/language-change/test.py", line 76, in <module>
t.testit()
File "/home/ngoyal/work/nlp_source/language-change/test.py", line 51, in testit
results=f_p(sequence)
File "/home/ngoyal/work/nlp_source/language-change/test.py", line 28, in easy_parallize
result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
有没有什么直接的方法可以解决这个问题,而无需转移到其他使用莳萝或其他东西的包?这可以用普通的 python 库来完成吗? (我正在使用 python 2.7)
pickle 模块无法序列化 lambda 函数,因为它们都具有相同的名称 (<lambda>
)。只需使用常规功能即可。
如果您进一步查看您发布的 link 我的回答 (),您会发现您确实可以做您想做的事……即使您使用 lambdas和默认字典和各种其他 python 结构。您所要做的就是将 multiprocessing
替换为 pathos.multiprocessing
... 并且它有效。请注意,我什至在解释器中工作。
>>> import numpy as np
>>> from functools import partial
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from collections import defaultdict
>>>
>>> class test(object):
... def __init__(self, words):
... self.words = words
... def parallel_function(self, f):
... def easy_parallelize(f, sequence):
... p = Pool()
... result = p.map(f, sequence)
... cleaned = [x for x in result if not x is None]
... cleaned = np.asarray(cleaned)
... return cleaned
... return partial(easy_parallelize, f)
... def dummy(self):
... self.t = defaultdict(lambda: 1.)
... def test(self, a, b, x):
... print x
... print a
... print x*x
... def testit(self):
... sequence = [1,2,3,4,5]
... f1 = partial(self.test, 'a','b')
... f_p = self.parallel_function(f1)
... results = f_p(sequence)
... return results
...
>>> t = test('fdfs')
>>> t.dummy()
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
array([], dtype=float64)
"It works" 因为 pathos
使用 dill
,它是一个序列化程序,几乎可以 pickle python 中的任何内容。您甚至可以动态替换该方法,它仍然有效。
>>> def parallel_funtion(self, f):
... def easy_parallelize(f, sequence):
... p = Pool()
... return p.map(f, sequence)
... return partial(easy_parallelize, f)
...
>>> test.parallel_function = parallel_funtion
>>>
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
[None, None, None, None, None]
在此处获取 pathos
和 dill
:https://github.com/uqfoundation
我正在编写一个程序,其中包含面向对象的代码,我正在尝试进行多处理。我收到泡菜错误,因为默认情况下 python 可以序列化函数但不能序列化 class 方法。所以我在 Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() 上使用了建议,但问题是如果我的方法中有一些 lambda 表达式,它就不起作用。 我的示例代码如下:
import numpy as np
from copy_reg import pickle
from types import MethodType
from multiprocessing.pool import ApplyResult
from _functools import partial
from _collections import defaultdict
class test(object):
def __init__(self,words):
self.words=words
# self.testLambda = defaultdict(lambda : 1.)
def parallel_function(self,f):
def easy_parallize(f,sequence):
from multiprocessing import Pool
pool = Pool(processes=50) # depends on available cores
result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
cleaned = [x for x in result if not x is None] # getting results
cleaned = np.asarray(cleaned)
pool.close() # not optimal! but easy
pool.join()
return cleaned
from functools import partial
return partial(easy_parallize, f)
def dummy(self):
self.t=defaultdict(lambda:1.)
def test(self,a,b,x):
print x
print a
return x*x
def testit(self):
sequence=[1,2,3,4,5]
f1=partial(self.test,'a','b')
f_p=self.parallel_function(f1)
results=f_p(sequence)
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
if __name__ == "__main__":
pickle(MethodType, _pickle_method, _unpickle_method)
t=test('fdfs')
t.dummy()
t.testit()
但由于 lambda 表达式,我得到以下错误:
Traceback (most recent call last):
File "/home/ngoyal/work/nlp_source/language-change/test.py", line 76, in <module>
t.testit()
File "/home/ngoyal/work/nlp_source/language-change/test.py", line 51, in testit
results=f_p(sequence)
File "/home/ngoyal/work/nlp_source/language-change/test.py", line 28, in easy_parallize
result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
有没有什么直接的方法可以解决这个问题,而无需转移到其他使用莳萝或其他东西的包?这可以用普通的 python 库来完成吗? (我正在使用 python 2.7)
pickle 模块无法序列化 lambda 函数,因为它们都具有相同的名称 (<lambda>
)。只需使用常规功能即可。
如果您进一步查看您发布的 link 我的回答 (),您会发现您确实可以做您想做的事……即使您使用 lambdas和默认字典和各种其他 python 结构。您所要做的就是将 multiprocessing
替换为 pathos.multiprocessing
... 并且它有效。请注意,我什至在解释器中工作。
>>> import numpy as np
>>> from functools import partial
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from collections import defaultdict
>>>
>>> class test(object):
... def __init__(self, words):
... self.words = words
... def parallel_function(self, f):
... def easy_parallelize(f, sequence):
... p = Pool()
... result = p.map(f, sequence)
... cleaned = [x for x in result if not x is None]
... cleaned = np.asarray(cleaned)
... return cleaned
... return partial(easy_parallelize, f)
... def dummy(self):
... self.t = defaultdict(lambda: 1.)
... def test(self, a, b, x):
... print x
... print a
... print x*x
... def testit(self):
... sequence = [1,2,3,4,5]
... f1 = partial(self.test, 'a','b')
... f_p = self.parallel_function(f1)
... results = f_p(sequence)
... return results
...
>>> t = test('fdfs')
>>> t.dummy()
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
array([], dtype=float64)
"It works" 因为 pathos
使用 dill
,它是一个序列化程序,几乎可以 pickle python 中的任何内容。您甚至可以动态替换该方法,它仍然有效。
>>> def parallel_funtion(self, f):
... def easy_parallelize(f, sequence):
... p = Pool()
... return p.map(f, sequence)
... return partial(easy_parallelize, f)
...
>>> test.parallel_function = parallel_funtion
>>>
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
[None, None, None, None, None]
在此处获取 pathos
和 dill
:https://github.com/uqfoundation