spacy with joblib library generates _pickle.PicklingError: Could not pickle the task to send it to the workers

spacy with joblib library generates _pickle.PicklingError: Could not pickle the task to send it to the workers

我有一大堆句子(约 7 百万),我想从中提取名词。

我使用 joblib 库来并行化提取过程,如下所示:

import spacy
from tqdm import tqdm
from joblib import Parallel, delayed
nlp = spacy.load('en_core_web_sm')

class nouns:

    def get_nouns(self, text):
        doc = nlp(u"{}".format(text))
        return [token.text for token in doc if token.tag_ in ['NN', 'NNP', 'NNS', 'NNPS']]

    def parallelize(self, sentences):
        results = Parallel(n_jobs=1)(delayed(self.get_nouns)(sent) for sent in tqdm(sentences))
        return results

if __name__ == '__main__':
    sentences = ['we went to the school yesterday',
                 'The weather is really cold',
                 'Can we catch the dog?',
                 'How old are you John?',
                 'I like diving and swimming',
                 'Can the world become united?']
    obj = nouns()
    print(obj.parallelize(sentences))

当 parallelize 函数中的 n_jobs 大于 1 时,我得到这个长错误:

100%|██████████| 6/6 [00:00<00:00, 200.00it/s]
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 243, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 236, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 267, in dump
    return Pickler.dump(self, obj)
  File "C:\Python35\lib\pickle.py", line 408, in dump
    self.save(obj)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "C:\Python35\lib\pickle.py", line 797, in _batch_appends
    save(tmp[0])
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 718, in save_instancemethod
    self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 395, in save_function
    self.save_function_tuple(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 594, in save_function_tuple
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 495, in save
    rv = reduce(self.proto)
  File "stringsource", line 2, in preshed.maps.PreshMap.__reduce_cython__
TypeError: self.c_map cannot be converted to a Python object for pickling
"""Exception in thread QueueFeederThread:
Traceback (most recent call last):
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 243, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 236, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 267, in dump
    return Pickler.dump(self, obj)
  File "C:\Python35\lib\pickle.py", line 408, in dump
    self.save(obj)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "C:\Python35\lib\pickle.py", line 797, in _batch_appends
    save(tmp[0])
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 718, in save_instancemethod
    self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 725, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 395, in save_function
    self.save_function_tuple(obj)
  File "C:\Python35\lib\site-packages\joblib\externals\cloudpickle\cloudpickle.py", line 594, in save_function_tuple
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 841, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Python35\lib\pickle.py", line 836, in _batch_setitems
    save(v)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 599, in save_reduce
    save(args)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 520, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python35\lib\pickle.py", line 623, in save_reduce
    save(state)
  File "C:\Python35\lib\pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python35\lib\pickle.py", line 740, in save_tuple
    save(element)
  File "C:\Python35\lib\pickle.py", line 495, in save
    rv = reduce(self.proto)
  File "stringsource", line 2, in preshed.maps.PreshMap.__reduce_cython__
TypeError: self.c_map cannot be converted to a Python object for pickling

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\backend\queues.py", line 175, in _feed
    onerror(e, obj)
  File "C:\Python35\lib\site-packages\joblib\externals\loky\process_executor.py", line 310, in _on_queue_feeder_error
    self.thread_wakeup.wakeup()
  File "C:\Python35\lib\site-packages\joblib\externals\loky\process_executor.py", line 155, in wakeup
    self._writer.send_bytes(b"")
  File "C:\Python35\lib\multiprocessing\connection.py", line 183, in send_bytes
    self._check_closed()
  File "C:\Python35\lib\multiprocessing\connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed



The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File ".../playground.py", line 43, in <module>
    print(obj.Paralize(sentences))
  File ".../playground.py", line 32, in Paralize
    results = Parallel(n_jobs=2)(delayed(self.get_nouns)(sent) for sent in tqdm(sentences))
  File "C:\Python35\lib\site-packages\joblib\parallel.py", line 934, in __call__
    self.retrieve()
  File "C:\Python35\lib\site-packages\joblib\parallel.py", line 833, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "C:\Python35\lib\site-packages\joblib\_parallel_backends.py", line 521, in wrap_future_result
    return future.result(timeout=timeout)
  File "C:\Python35\lib\concurrent\futures\_base.py", line 405, in result
    return self.__get_result()
  File "C:\Python35\lib\concurrent\futures\_base.py", line 357, in __get_result
    raise self._exception
_pickle.PicklingError: Could not pickle the task to send it to the workers.

我的代码有什么问题?

Q: What is the problem in my code?

好吧,问题很可能不是来自代码,而是来自 "hidden" 处理,一旦 n_jobs 指示 ( 和 joblib 内部编排 ) 准备主进程的许多精确副本,以便让它们彼此独立工作(从而有效地从 GIL-locking 中逃脱并将多个 process-flows 映射到物理上硬件资源)

此步骤负责制作所有 pythonic 对象的副本,并且已知使用 Pickle 来执行此操作。 Pickle 模块以其对什么可以 pickle 和什么不能 pickle 的历史主要限制而闻名。

错误消息证实了这一点:

TypeError: self.c_map cannot be converted to a Python object for pickling

有人可能会尝试提供 Mike McKearns dill 模块而不是 Pickle 并测试,如果您的 "problematic" python 对象将被此模块腌制而不会抛出此错误。

dill 具有相同的 API 签名,因此纯 import dill as pickle 可能有助于保持所有其他代码相同。

我遇到了同样的问题,大型模型要分布到多个进程中并从多个进程返回,dill 是一种可行的方法。性能也有所提高。

Bonus: dill allows to save / restore the full python interpreter state!

这是一个很酷的 side-effect 发现 dill,一旦 import dill as pickle 完成,pickle.dump_session( <aFile> ) 将保存 state-full 的完整副本 python 口译会话。如果需要,这可以恢复(post-crash 恢复,训练和优化 ML-model state-fully 保存/恢复,增量学习 ML-model state-fully 保存和 re-distributed 用于已部署的远程恢复 user-bases,等等。)

我的问题的补充答案:

我没有找到使用 Spacy 的 Joblib 解决方案,而是为了并行化该过程,我发现 Spacy 发布了一个名为 Pipeline 的东西,您可以在其中使用 [=18= 解析大量文档].

我用上面的相同示例应用它:

class nouns:

    def get_nouns(self, sentences):
        start = time.time()
        docs = nlp.pipe(sentences, n_threads=-1)
        result = [ ' '.join([token.text for token in doc if token.tag_ in ['NN', 'NNP', 'NNS', 'NNPS']]) for doc in docs]
        print('Time Elapsed {} ms'.format((time.time() - start) * 1000))
        print(result)


if __name__ == '__main__':
    sentences = ['we went to the school yesterday',
                 'The weather is really cold',
                 'Can we catch the dog?',
                 'How old are you John?',
                 'I like diving and swimming',
                 'Can the world become united?']
    obj = nouns()
    obj.get_nouns(sentences)

我在并行词形还原方面遇到了类似的问题,但在另一个库中 pymystem3

from pymystem3 import Mystem
mystem = Mystem()
    
def preprocess_text(text):
   ...
   tokens = mystem.lemmatize(text)
   ...
   text = " ".join(tokens)
   return text

data_set = Parallel(n_jobs=-1)(delayed(preprocess_text)(article) for article in tqdm(articles))

解决方案是将初始化放入函数中。

def preprocess_text(text):
   ...
   mystem = Mystem()
   tokens = mystem.lemmatize(text)
   ...
   text = " ".join(tokens)
   return text

我想你可以用 nlp = spacy.load

做同样的尝试

同样的问题。我通过在 Parallel.

中将后端从 loky 更改为 threading 来解决

只想加上我的两分钱。在您的 class 方法上使用 @staticmethod 并保留自动注入的自对象,以防止意外序列化整个框架,就像我的情况(烧瓶)一样。由于框架做了很多幕后注入并破坏了序列化依赖关系。

关于我如何解决这个问题的案例研究:

环境

  • Windows 10 x64
  • Python 3.9 或 3.10
  • joblib v1.1

解决方案

# Examine the stack trace very carefully, you will see a line something like this:
TypeError: self.c_map cannot be converted to a Python object for pickling

这会告诉您哪些变量不能序列化。

要修复,请选择一个选项:

  1. 从函数中删除变量。
  2. 从头开始初始化函数中的变量。

就我而言,我不得不混合使用#1 和#2:

  1. 删除了指向 class 的变量,该变量具有打开文件的句柄(它不能用打开的文件 pickle 任何东西)。
  2. 一个class变量无法被pickle,所以我在函数中再次初始化它(这样就不需要序列化这个class并将它传递给新进程)。

示例代码

# [Bugfix]. Add next line to initialize this again to eliminate pmap pickle error. Stacktrace is your friend!
hdb = HivedbApi(base_dir=hivedb_base_dir, table_name=table_name, partition_type=PartitionType.HiveFilePerDate)
hdb.write(df_trades)

在 OP 的示例中,我会根据堆栈跟踪在 get_nouns() 中寻找一些无法序列化的变量(它会准确地告诉您它遇到了什么变量)。

无效的解决方案

此页面上的任何其他内容均无效,包括将后端更改为 threading、将 pickler 更改为 dill、注释函数、更改 Python 版本等。

底线:

Sometimes, nothing can serialize a class, especially if it has handles to open files. In this case, the only solution is to (a) remove these variables from the target function, or (b) reinitialize these variables inside the target function.