joblib 结果因 return 值而异

joblib results vary wildly depending on return value

我必须使用 Spacy 分析大型文本数据集。该数据集包含大约 120000 条记录,典型文本长度约为 1000 个单词。对文本进行词形还原需要相当长的时间,所以我寻找减少时间的方法。 This arcicle 描述了如何使用 joblib 加速计算。这相当有效:16 个内核将 CPU 时间减少了 10 倍,超线程将其额外减少了 7%。

最近我意识到我想计算文档之间的相似性,以后可能会用文档进行更多分析。因此,我决定为所有文档生成一个 Spacy 文档实例 (),并在稍后将其用于分析(词形还原、向量化,可能还有更多)。这就是麻烦开始的地方。

并行词形还原器的分析发生在下面的函数中:

def lemmatize_pipe(doc):
    lemma_list = [str(tok.lemma_).lower() for tok in doc
                  if tok.is_alpha]

    return lemma_list

(完整的演示代码可以在post末尾找到)。我所要做的就是返回 doc 而不是 lemma_list 并且我准备好了。我想。

def lemmatize_pipe(doc):

    return doc

顺序版本运行 73 秒,返回 lemma_list 的并行版本需要 7 秒,而返回 doc 的版本运行 127 秒:是顺序版本的两倍。完整代码如下。

import time
import pandas as pd
from joblib import Parallel, delayed
import gensim.downloader as api
import spacy

from pdb import set_trace as breakpoint

# Initialize spacy with the small english language model
nlp = spacy.load('en', disable=['parser', 'ner', 'tagger'])
nlp.add_pipe(nlp.create_pipe('sentencizer'))

# Import the dataset and get the text
dataset = api.load("text8")
data = [d for d in dataset]
doc_requested = False

print(len(data), 'documents in original data')

df_data = pd.DataFrame(columns=['content'])
df_data['content'] = df_data['content'].astype(str)

# Content is a list of words, convert is to strings
for doc in data:
    sentence = ' '.join([word for word in doc])
    df_data.loc[len(df_data)] = [sentence]

### === Sequential processing ===
def lemmatize(text):
    doc = nlp(text)
    lemma_list = [str(tok.lemma_).lower() for tok in doc
                  if tok.is_alpha]

    return doc if doc_requested else lemma_list

cpu = time.time()

df_data['sequential'] = df_data['content'].apply(lemmatize)
print('\nSequential processing in {:.0f} seconds'.format(time.time() - cpu))
df_data.head(3)

### === Parallel processing ===
def lemmatize_pipe(doc):
    lemma_list = [str(tok.lemma_).lower() for tok in doc
                  if tok.is_alpha]

    return doc if doc_requested else lemma_list

def chunker(iterable, total_length, chunksize):
    return (iterable[pos: pos + chunksize] for pos in range(0, total_length,
                                                           chunksize))
def process_chunk(texts):
    preproc_pipe = []
    for doc in nlp.pipe(texts, batch_size=20):
        preproc_pipe.append(lemmatize_pipe(doc))

    return preproc_pipe

def preprocess_parallel(data, chunksize):
    executor = Parallel(n_jobs=31, backend='multiprocessing', prefer="processes")
    do = delayed(process_chunk)
    tasks = (do(chunk) for chunk in chunker(data, len(data), chunksize=chunksize))
    result = executor(tasks)
    flattened = [item for sublist in result for item in sublist]

    return flattened

cpu = time.time()
df_data['parallel'] = preprocess_parallel(df_data['content'], chunksize=1)
print('\nParallel processing in {:.0f} seconds'.format(time.time() - cpu))

我搜索并尝试了各种方法,但找不到解决方案。最后,我决定与引理一起计算相似性,但这是一种解决方法。时间增加的真正原因是什么?有没有办法在不浪费那么多时间的情况下获取文档?

pickled doc 非常大,包含大量重建文档本身不需要的数据,包括整个模型词汇表。使用 doc.to_bytes() 将是一个重大改进,您可以通过使用 exclude 排除不需要的数据来进一步改进它,例如 doc.tensor:

data = doc.to_bytes(exclude=["tensor"])
...
reloaded_doc = Doc(nlp.vocab)
reloaded_doc.from_bytes(data)

比较:

doc = nlp("test")
len(pickle.dumps(doc))                              # 1749721
len(pickle.dumps(doc.to_bytes()))                   # 750
len(pickle.dumps(doc.to_bytes(exclude=["tensor"]))) # 316

您也可以使用 doc.to_array() 而不是 doc.to_bytes() 来仅导出您需要的注释层,但是从数组重新加载文档稍微复杂一些。

参见: