在python 2.7 中合并一个成员函数
Pooling a member function in python 2.7
这段代码出现奇怪的错误。我正在尝试合并一个辅助函数的实例,该辅助函数是 class 调用池的成员。虽然我怀疑这是否行得通,但我不确定确切的原因是什么?当我 运行 这是一个 "PicklingError" 时抛出的错误。有人可以解释为什么吗?
import multiprocessing
import time
class Pooler(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
def run(self):
pool = multiprocessing.Pool(10)
print "starting pool"
pool.map(self.worker, xrange(10), chunksize=10)
def worker(self, arg):
print "worker - arg - {}".format(arg)
if __name__ == '__main__':
jobs = []
for i in range(5):
proc = Pooler()
jobs.append(proc)
proc.start()
for j in jobs:
j.join()
print "...ending"
更新
我将代码更改为如下所示:
import multiprocessing
import time
class Pooler(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
def run(self):
pool = multiprocessing.Pool(1)
print "starting pool"
obj = Worker()
pool.map(obj.run, range(10), chunksize=1)
class Worker(object):
def __init__(self):
pass
def run(self, nums):
print "worker - arg - {}".format(nums)
if __name__ == '__main__':
jobs = []
for i in range(1):
proc = Pooler()
jobs.append(proc)
proc.start()
for j in jobs:
j.join()
print "...ending"
但我仍然收到以下错误:
starting pool
Process Pooler-1:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "pool_test.py", line 13, in run
pool.map(obj.run, range(10), chunksize=1)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
...ending
答案很简单。 multiprocessing
使用 pickle
序列化对象并在不同进程之间传递这些对象——作为错误状态,pickle
无法序列化 instancemethod
。如果要序列化 instancemethod
(请参阅 ),则需要使用更好的序列化程序,例如 dill
。
那么 multiprocessing
你能做些什么呢?幸运的是,有一个名为 multiprocess
的 multiprocessing
的分支,它使用 dill
,如果您使用它,您的对象将序列化并且您的代码将工作。这是一个单行更改,带来的好处是能够从解释器 运行 以及序列化 python 中的几乎所有对象。 (我上面贴的link是针对pathos
和dill
的,但是pathos
是建立在multiprocess
之上的,所以还是很相关的。)
>>> import multiprocess as multiprocessing
>>> import time
>>> class Pooler(multiprocessing.Process):
... def __init__(self):
... multiprocessing.Process.__init__(self)
... def run(self):
... pool = multiprocessing.Pool(1)
... print "starting pool"
... obj = Worker()
... pool.map(obj.run, range(10), chunksize=1)
...
>>> class Worker(object):
... def __init__(self):
... pass
... def run(self, nums):
... print "worker - arg - {}".format(nums)
...
>>> if __name__ == '__main__':
... jobs = []
... for i in range(1):
... proc = Pooler()
... jobs.append(proc)
... proc.start()
... for j in jobs:
... j.join()
... print "...ending"
...
starting pool
worker - arg - 0
worker - arg - 1
worker - arg - 2
worker - arg - 3
worker - arg - 4
worker - arg - 5
worker - arg - 6
worker - arg - 7
worker - arg - 8
worker - arg - 9
...ending
>>>
这段代码出现奇怪的错误。我正在尝试合并一个辅助函数的实例,该辅助函数是 class 调用池的成员。虽然我怀疑这是否行得通,但我不确定确切的原因是什么?当我 运行 这是一个 "PicklingError" 时抛出的错误。有人可以解释为什么吗?
import multiprocessing
import time
class Pooler(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
def run(self):
pool = multiprocessing.Pool(10)
print "starting pool"
pool.map(self.worker, xrange(10), chunksize=10)
def worker(self, arg):
print "worker - arg - {}".format(arg)
if __name__ == '__main__':
jobs = []
for i in range(5):
proc = Pooler()
jobs.append(proc)
proc.start()
for j in jobs:
j.join()
print "...ending"
更新
我将代码更改为如下所示:
import multiprocessing
import time
class Pooler(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
def run(self):
pool = multiprocessing.Pool(1)
print "starting pool"
obj = Worker()
pool.map(obj.run, range(10), chunksize=1)
class Worker(object):
def __init__(self):
pass
def run(self, nums):
print "worker - arg - {}".format(nums)
if __name__ == '__main__':
jobs = []
for i in range(1):
proc = Pooler()
jobs.append(proc)
proc.start()
for j in jobs:
j.join()
print "...ending"
但我仍然收到以下错误:
starting pool
Process Pooler-1:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "pool_test.py", line 13, in run
pool.map(obj.run, range(10), chunksize=1)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
...ending
答案很简单。 multiprocessing
使用 pickle
序列化对象并在不同进程之间传递这些对象——作为错误状态,pickle
无法序列化 instancemethod
。如果要序列化 instancemethod
(请参阅 ),则需要使用更好的序列化程序,例如 dill
。
那么 multiprocessing
你能做些什么呢?幸运的是,有一个名为 multiprocess
的 multiprocessing
的分支,它使用 dill
,如果您使用它,您的对象将序列化并且您的代码将工作。这是一个单行更改,带来的好处是能够从解释器 运行 以及序列化 python 中的几乎所有对象。 (我上面贴的link是针对pathos
和dill
的,但是pathos
是建立在multiprocess
之上的,所以还是很相关的。)
>>> import multiprocess as multiprocessing
>>> import time
>>> class Pooler(multiprocessing.Process):
... def __init__(self):
... multiprocessing.Process.__init__(self)
... def run(self):
... pool = multiprocessing.Pool(1)
... print "starting pool"
... obj = Worker()
... pool.map(obj.run, range(10), chunksize=1)
...
>>> class Worker(object):
... def __init__(self):
... pass
... def run(self, nums):
... print "worker - arg - {}".format(nums)
...
>>> if __name__ == '__main__':
... jobs = []
... for i in range(1):
... proc = Pooler()
... jobs.append(proc)
... proc.start()
... for j in jobs:
... j.join()
... print "...ending"
...
starting pool
worker - arg - 0
worker - arg - 1
worker - arg - 2
worker - arg - 3
worker - arg - 4
worker - arg - 5
worker - arg - 6
worker - arg - 7
worker - arg - 8
worker - arg - 9
...ending
>>>