多进程初始化器和酸洗

multiprocess initializer and pickling

我一直在研究 multiprocessing.Pool 并试图理解 initializer 论证的确切原理。据我所知,每个进程都会调用初始化函数,所以我假设它的参数(即 initargs)必须跨进程边界进行 pickle。我知道池的 map 方法也对其参数使用酸洗,所以我假设任何作为初始化器参数的东西也应该作为映射的参数。

但是,当我 运行 以下代码段时,initialize 被调用得很好,但随后 map 抛出无法 pickle 模块的异常。 (使用当前模块作为参数没有什么特别的;它只是我想到的第一个不可腌制的对象。)有人知道这种差异背后的原因吗?

from __future__ import print_function
import multiprocessing
import sys


def get_pid():
    return multiprocessing.current_process().pid


def initialize(module):
    print('Got module {} in PID {}'.format(module, get_pid()))


def worker(module):
    print('Got module {} in PID {}'.format(module, get_pid()))


current_module = sys.modules[__name__]
work = [current_module]

print('Main process has PID {}'.format(get_pid()))
pool = multiprocessing.Pool(None, initialize, work)
pool.map(worker, work)

初始化不需要酸洗,但 map 调用需要。也许这会说明一些问题……(这里我使用 multiprocess 而不是 multiprocessing 来提供更好的酸洗和交互性)。

>>> from __future__ import print_function
>>> import multiprocess as multiprocessing
>>> import sys
>>> 
>>> def get_pid():
...     return multiprocessing.current_process().pid
... 
>>> 
>>> def initialize(module):
...     print('Got module {} in PID {}'.format(module, get_pid()))
... 
>>> 
>>> def worker(module):
...     print('Got module {} in PID {}'.format(module, get_pid()))
... 
>>> 
>>> current_module = sys.modules[__name__]
>>> work = [current_module]
>>> 
>>> print('Main process has PID {}'.format(get_pid()))
Main process has PID 34866
>>> pool = multiprocessing.dummy.Pool(None, initialize, work)
Got module <module '__main__' (built-in)> in PID 34866
Got module <module '__main__' (built-in)> in PID 34866
Got module <module '__main__' (built-in)> in PID 34866
Got module <module '__main__' (built-in)> in PID 34866
Got module <module '__main__' (built-in)> in PID 34866
Got module <module '__main__' (built-in)> in PID 34866
Got module <module '__main__' (built-in)> in PID 34866
Got module <module '__main__' (built-in)> in PID 34866
>>> pool.map(worker, work)
Got module <module '__main__' (built-in)> in PID 34866
[None]

酷。线程 pool 有效……(因为它不需要 pickle 任何东西)。当我们使用序列化运送 workerwork 时怎么样?

>>> pool = multiprocessing.Pool(None, initialize, work)
Got module <module '__main__' (built-in)> in PID 34875
Got module <module '__main__' (built-in)> in PID 34876
Got module <module '__main__' (built-in)> in PID 34877
Got module <module '__main__' (built-in)> in PID 34878
Got module <module '__main__' (built-in)> in PID 34879
Got module <module '__main__' (built-in)> in PID 34880
Got module <module '__main__' (built-in)> in PID 34881
Got module <module '__main__' (built-in)> in PID 34882
>>> pool.map(worker, work)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/mmckerns/lib/python2.7/site-packages/multiprocess-0.70.4.dev0-py2.7-macosx-10.8-x86_64.egg/multiprocess/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/mmckerns/lib/python2.7/site-packages/multiprocess-0.70.4.dev0-py2.7-macosx-10.8-x86_64.egg/multiprocess/pool.py", line 567, in get
    raise self._value
NotImplementedError: pool objects cannot be passed between processes or pickled
>>> 

那么让我们看看酸洗 work:

>>> import pickle
>>> import sys            
>>> pickle.dumps(sys.modules[__name__])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle module objects
>>> 

所以,你不能 pickle 一个模块……好的,我们可以使用 dill 做得更好吗?

>>> import dill
>>> dill.detect.trace(True)
>>> dill.pickles(work)
M1: <module '__main__' (built-in)>
F2: <function _import_module at 0x10c017cf8>
# F2
D2: <dict object at 0x10d9a8168>
M2: <module 'dill' from '/Users/mmckerns/lib/python2.7/site-packages/dill-0.2.5.dev0-py2.7.egg/dill/__init__.pyc'>
# M2
F1: <function worker at 0x10c07fed8>
F2: <function _create_function at 0x10c017488>
# F2
Co: <code object worker at 0x10b053cb0, file "<stdin>", line 1>
F2: <function _unmarshal at 0x10c017320>
# F2
# Co
D1: <dict object at 0x10af68168>
# D1
D2: <dict object at 0x10c0e4a28>
# D2
# F1
M2: <module 'sys' (built-in)>
# M2
F1: <function initialize at 0x10c07fe60>
Co: <code object initialize at 0x10b241f30, file "<stdin>", line 1>
# Co
D1: <dict object at 0x10af68168>
# D1
D2: <dict object at 0x10c0ea398>
# D2
# F1
M2: <module 'pathos' from '/Users/mmckerns/lib/python2.7/site-packages/pathos-0.2a1.dev0-py2.7.egg/pathos/__init__.pyc'>
# M2
C2: __future__._Feature
# C2
D2: <dict object at 0x10b05b7f8>
# D2
M2: <module 'multiprocess' from '/Users/mmckerns/lib/python2.7/site-packages/multiprocess-0.70.4.dev0-py2.7-macosx-10.8-x86_64.egg/multiprocess/__init__.pyc'>
# M2
T4: <class 'pathos.threading.ThreadPool'>
# T4
D2: <dict object at 0x10c0ea5c8>
# D2
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/mmckerns/lib/python2.7/site-packages/dill-0.2.5.dev0-py2.7.egg/dill/dill.py", line 1209, in pickles
    pik = copy(obj, **kwds)
  File "/Users/mmckerns/lib/python2.7/site-packages/dill-0.2.5.dev0-py2.7.egg/dill/dill.py", line 161, in copy
    return loads(dumps(obj, *args, **kwds))
  File "/Users/mmckerns/lib/python2.7/site-packages/dill-0.2.5.dev0-py2.7.egg/dill/dill.py", line 197, in dumps
    dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
  File "/Users/mmckerns/lib/python2.7/site-packages/dill-0.2.5.dev0-py2.7.egg/dill/dill.py", line 190, in dump
    pik.dump(obj)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/mmckerns/lib/python2.7/site-packages/dill-0.2.5.dev0-py2.7.egg/dill/dill.py", line 1116, in save_module
    state=_main_dict)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 419, in save_reduce
    save(state)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/mmckerns/lib/python2.7/site-packages/dill-0.2.5.dev0-py2.7.egg/dill/dill.py", line 768, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/Users/mmckerns/lib/python2.7/site-packages/multiprocess-0.70.4.dev0-py2.7-macosx-10.8-x86_64.egg/multiprocess/pool.py", line 452, in __reduce__
    'pool objects cannot be passed between processes or pickled'
NotImplementedError: pool objects cannot be passed between processes or pickled
>>> 

答案是 -- 模块开始 pickle,但是,由于模块中的内容而失败……所以它看起来适用于 [=23] 中的所有内容=] 除非在 __main__ 中有一个 pool 的实例——那么它将失败。

因此,如果将最后两行代码替换为这一行,它将起作用:

>>> multiprocessing.Pool(None, initialize, work).map(worker, work)
Got module <module '__main__' (built-in)> in PID 34922
Got module <module '__main__' (built-in)> in PID 34923
Got module <module '__main__' (built-in)> in PID 34924
Got module <module '__main__' (built-in)> in PID 34925
Got module <module '__main__' (built-in)> in PID 34926
Got module <module '__main__' (built-in)> in PID 34927
Got module <module '__main__' (built-in)> in PID 34928
Got module <module '__main__' (built-in)> in PID 34929
Got module <module '__main__' (built-in)> in PID 34922
[None]
>>> 

这是在使用 multiprocess,因为它在幕后使用 dillpickle 仍然会在这里 pickle 失败,因为 pickle 无法序列化模块。需要序列化,因为必须将对象发送到另一个进程上的另一个 python 实例。