将 dask 数据框中的列转换为 Doc2Vec 的 TaggedDocument

Convert a column in a dask dataframe to a TaggedDocument for Doc2Vec

简介

目前我正在尝试将 dask 与 gensim 结合使用来进行 NLP 文档计算,但在将我的语料库转换为“TaggedDocument”时我遇到了问题运行。

因为我尝试了很多不同的方法来解决这个问题,所以我将列出我的尝试。

处理这个问题的每一次尝试都会遇到略有不同的困难。

首先给出一些初始信息。

数据

df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)
  claim_no   claim_txt I                                    CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0

期望的输出

>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])

错误编号 1 不允许使用发电机

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

类型错误:无法序列化生成器类型的对象。

我发现在仍然使用生成器的情况下无法解决这个问题。解决这个问题会很棒!因为这对于常规 pandas.

非常有效

错误号 2 只有每个分区的第一个元素

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

这个有点笨,因为函数不会迭代(我知道)但会给出所需的格式,但每个分区的第一行只有 returns。

错误编号 3 函数调用在 100% 时挂起 cpu

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    tagged_list = []
    for i, line in enumerate(df[corp]):
        tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
        tagged_list.append(tagged)
    return tagged_list

据我所知,在循环外重构 return 时,此函数挂起在 dask 客户端中构建内存,我的 CPU 利用率达到 100%,但未计算任何任务。请记住,我以相同的方式调用函数。

Pandas 解决方案

def tag_corp(corp,tag):
    return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))

tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]

列出 comp 我还没有对这个解决方案进行时间测试

其他Pandas解决方案

tagged_document = list(read_corpus_tag_sub(df))

这个解决方案会持续好几个小时。但是,完成后我没有足够的内存来处理这件事。

结论(?)

我现在感觉超级迷茫。这是我看过的线程列表。我承认我是 dask 的新手,我刚刚花了这么多时间,我觉得我在做一件傻事。

  1. Processing Text With Dask
  2. Speed up Pandas apply using Dask
  3. The Docs

我对 Dask 不熟悉 APIs/limitations,但总的来说:

  • 如果您可以将数据作为(单词、标签)​​元组进行迭代——甚至忽略 Doc2Vec/TaggedDocument 步骤——那么 Dask 端将得到处理,并将这些元组转换为 TaggedDocument 实例应该是微不足道的

  • 一般来说,对于大型数据集,您不想(并且可能没有足够的 RAM)将整个数据集实例化为内存中的 list - 所以您的尝试涉及list().append() 可能在一定程度上工作,但会耗尽本地内存(导致严重交换)and/or 只是没有到达数据的末尾。

大型数据集的首选方法是创建一个可迭代对象,每次要求它迭代数据时(因为 Doc2Vec 训练需要多次传递),可以提供每个项目反过来——但从不将整个数据集读入 in-memory 对象。

关于此模式的一篇不错的博文是:Data streaming in Python: generators, iterators, iterables

鉴于您显示的代码,我怀疑适合您的方法可能是:

from gensim.utils import simple_preprocess

class MyDataframeCorpus(object):
    def __init__(self, source_df, text_col, tag_col):
        self.source_df = source_df
        self.text_col = text_col
        self.tag_col = tag_col

    def __iter__(self):
        for i, row in self.source_df.iterrows():
            yield TaggedDocument(words=simple_preprocess(row[self.text_col]), 
                                 tags=[row[self.tag_col]])

corpus_for_doc2vec = MyDataframeCorpus(df, 'claim_txt', 'claim_no')

好的,你已经接近这个代码了

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)

但是正如您所见,生成生成器对 Dask 不是很有帮助。相反,你可以让你的函数 return 成为一个系列

def myfunc(df, *args, **kwargs):
    output = []
    for i, line in enumerate(df["my_series"])
        result = ...
        output.append([])
    return pd.Series(output)

或者,您可能只使用 df.apply 方法,该方法采用将单行转换为单行的函数。

您可能还想切换到 Dask Bag,它确实比 Pandas/Dask DataFrame 更自然地处理列表和生成器之类的事情。