Python class 方法的多处理错误

Python multiprocessing error with class methods

我正在编写一个程序,其中包含面向对象的代码,我正在尝试进行多处理。我收到泡菜错误,因为默认情况下 python 可以序列化函数但不能序列化 class 方法。所以我在 Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() 上使用了建议,但问题是如果我的方法中有一些 lambda 表达式,它就不起作用。 我的示例代码如下:

import numpy as np

from copy_reg import pickle
from types import MethodType
from multiprocessing.pool import ApplyResult
from _functools import partial
from _collections import defaultdict


class test(object):
    def __init__(self,words):
        self.words=words
#         self.testLambda = defaultdict(lambda : 1.)

    def parallel_function(self,f):
        def easy_parallize(f,sequence):
            from multiprocessing import Pool
            pool = Pool(processes=50) # depends on available cores
            result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
            cleaned = [x for x in result if not x is None] # getting results
            cleaned = np.asarray(cleaned)
            pool.close() # not optimal! but easy
            pool.join()
            return cleaned
        from functools import partial


        return partial(easy_parallize, f)

    def dummy(self):
        self.t=defaultdict(lambda:1.)

    def test(self,a,b,x):
        print x
        print a
        return x*x

    def testit(self):
        sequence=[1,2,3,4,5]
        f1=partial(self.test,'a','b')
        f_p=self.parallel_function(f1)
        results=f_p(sequence)


def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)



if __name__ ==   "__main__":
    pickle(MethodType, _pickle_method, _unpickle_method)
    t=test('fdfs')
    t.dummy()
    t.testit()

但由于 lambda 表达式,我得到以下错误:

Traceback (most recent call last):
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 76, in <module>
    t.testit()
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 51, in testit
    results=f_p(sequence)
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 28, in easy_parallize
    result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

有没有什么直接的方法可以解决这个问题,而无需转移到其他使用莳萝或其他东西的包?这可以用普通的 python 库来完成吗? (我正在使用 python 2.7)

pickle 模块无法序列化 lambda 函数,因为它们都具有相同的名称 (<lambda>)。只需使用常规功能即可。

如果您进一步查看您发布的 link 我的回答 (),您会发现您确实可以做您想做的事……即使您使用 lambdas和默认字典和各种其他 python 结构。您所要做的就是将 multiprocessing 替换为 pathos.multiprocessing... 并且它有效。请注意,我什至在解释器中工作。

>>> import numpy as np
>>> from functools import partial
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from collections import defaultdict
>>> 
>>> class test(object):
...   def __init__(self, words):
...     self.words = words
...   def parallel_function(self, f):
...     def easy_parallelize(f, sequence):
...       p = Pool()
...       result = p.map(f, sequence)
...       cleaned = [x for x in result if not x is None]
...       cleaned = np.asarray(cleaned)
...       return cleaned
...     return partial(easy_parallelize, f)
...   def dummy(self):
...     self.t = defaultdict(lambda: 1.)
...   def test(self, a, b, x):
...     print x
...     print a
...     print x*x
...   def testit(self):
...     sequence = [1,2,3,4,5]
...     f1 = partial(self.test, 'a','b')
...     f_p = self.parallel_function(f1)
...     results = f_p(sequence)
...     return results
... 
>>> t = test('fdfs')
>>> t.dummy()
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
array([], dtype=float64)

"It works" 因为 pathos 使用 dill,它是一个序列化程序,几乎可以 pickle python 中的任何内容。您甚至可以动态替换该方法,它仍然有效。

>>> def parallel_funtion(self, f):
...   def easy_parallelize(f, sequence):
...     p = Pool()
...     return p.map(f, sequence)
...   return partial(easy_parallelize, f)
... 
>>> test.parallel_function = parallel_funtion 
>>> 
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
[None, None, None, None, None]

在此处获取 pathosdillhttps://github.com/uqfoundation