具有 textacy 或 spacy 的多处理
multiprocessing with textacy or spacy
我正在尝试通过文本并行化来加快处理大型文本列表的速度。当我从多处理中使用 Pool 时,生成的文本语料库是空的。我不确定问题出在我使用 textacy 还是多处理范例的方式上?
这是说明我的问题的示例:
import spacy
import textacy
from multiprocessing import Pool
texts_dict={
"key1":"First text 1."
,"key2":"Second text 2."
,"key3":"Third text 3."
,"key4":"Fourth text 4."
}
model=spacy.load('en_core_web_lg')
# this works
corpus = textacy.corpus.Corpus(lang=model)
corpus.add(tuple([value, {'key':key}],) for key,value in texts_dict.items())
print(corpus) # prints Corpus(4 docs, 8 tokens)
print([doc for doc in corpus])
# now the same thing with a worker pool returns empty corpus
corpus2 = textacy.corpus.Corpus(lang=model)
pool = Pool(processes=2)
pool.map( corpus2.add, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )
print(corpus2) # prints Corpus(0 docs, 0 tokens)
print([doc for doc in corpus2])
# to make sure we get the right data into corpus.add
pool.map( print, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )
Textacy 基于 spacy。 Spacy 不支持多线程,但应该可以在多个进程中使用 运行。 https://github.com/explosion/spaCy/issues/2075
根据@constt 的重要建议,将结果收集到语料库适用于 n_docs= 10273 n_sentences= 302510 n_tokens= 2053129。
对于更大的数据集(16000 个 docs 3MM 标记),我收到以下错误:
result_corpus=corpus.get()
File "<string>", line 2, in get
File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Unserializable message: Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
send(msg)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
´
我会调查,但如果您有直接的解决方案 - 非常感谢!
由于 python 在单独的内存空间中处理 运行,您必须在池中的进程之间共享您的 corpus
对象。为此,您必须将 corpus
对象包装到一个可共享的 class 中,您将使用 BaseManager class 注册它。以下是重构代码以使其工作的方法:
#!/usr/bin/python3
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import spacy
import textacy
texts = {
'key1': 'First text 1.',
'key2': 'Second text 2.',
'key3': 'Third text 3.',
'key4': 'Fourth text 4.',
}
class PoolCorpus(object):
def __init__(self):
model = spacy.load('en_core_web_sm')
self.corpus = textacy.corpus.Corpus(lang=model)
def add(self, data):
self.corpus.add(data)
def get(self):
return self.corpus
BaseManager.register('PoolCorpus', PoolCorpus)
if __name__ == '__main__':
with BaseManager() as manager:
corpus = manager.PoolCorpus()
with Pool(processes=2) as pool:
pool.map(corpus.add, ((v, {'key': k}) for k, v in texts.items()))
print(corpus.get())
输出:
Corpus(4 docs, 16 tokens)
我正在尝试通过文本并行化来加快处理大型文本列表的速度。当我从多处理中使用 Pool 时,生成的文本语料库是空的。我不确定问题出在我使用 textacy 还是多处理范例的方式上? 这是说明我的问题的示例:
import spacy
import textacy
from multiprocessing import Pool
texts_dict={
"key1":"First text 1."
,"key2":"Second text 2."
,"key3":"Third text 3."
,"key4":"Fourth text 4."
}
model=spacy.load('en_core_web_lg')
# this works
corpus = textacy.corpus.Corpus(lang=model)
corpus.add(tuple([value, {'key':key}],) for key,value in texts_dict.items())
print(corpus) # prints Corpus(4 docs, 8 tokens)
print([doc for doc in corpus])
# now the same thing with a worker pool returns empty corpus
corpus2 = textacy.corpus.Corpus(lang=model)
pool = Pool(processes=2)
pool.map( corpus2.add, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )
print(corpus2) # prints Corpus(0 docs, 0 tokens)
print([doc for doc in corpus2])
# to make sure we get the right data into corpus.add
pool.map( print, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )
Textacy 基于 spacy。 Spacy 不支持多线程,但应该可以在多个进程中使用 运行。 https://github.com/explosion/spaCy/issues/2075
根据@constt
对于更大的数据集(16000 个 docs 3MM 标记),我收到以下错误:
result_corpus=corpus.get()
File "<string>", line 2, in get
File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Unserializable message: Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
send(msg)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
´ 我会调查,但如果您有直接的解决方案 - 非常感谢!
由于 python 在单独的内存空间中处理 运行,您必须在池中的进程之间共享您的 corpus
对象。为此,您必须将 corpus
对象包装到一个可共享的 class 中,您将使用 BaseManager class 注册它。以下是重构代码以使其工作的方法:
#!/usr/bin/python3
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import spacy
import textacy
texts = {
'key1': 'First text 1.',
'key2': 'Second text 2.',
'key3': 'Third text 3.',
'key4': 'Fourth text 4.',
}
class PoolCorpus(object):
def __init__(self):
model = spacy.load('en_core_web_sm')
self.corpus = textacy.corpus.Corpus(lang=model)
def add(self, data):
self.corpus.add(data)
def get(self):
return self.corpus
BaseManager.register('PoolCorpus', PoolCorpus)
if __name__ == '__main__':
with BaseManager() as manager:
corpus = manager.PoolCorpus()
with Pool(processes=2) as pool:
pool.map(corpus.add, ((v, {'key': k}) for k, v in texts.items()))
print(corpus.get())
输出:
Corpus(4 docs, 16 tokens)