将 dask 数据框中的列转换为 Doc2Vec 的 TaggedDocument
Convert a column in a dask dataframe to a TaggedDocument for Doc2Vec
简介
目前我正在尝试将 dask 与 gensim 结合使用来进行 NLP 文档计算,但在将我的语料库转换为“TaggedDocument”时我遇到了问题运行。
因为我尝试了很多不同的方法来解决这个问题,所以我将列出我的尝试。
处理这个问题的每一次尝试都会遇到略有不同的困难。
首先给出一些初始信息。
数据
df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)
claim_no claim_txt I CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0
期望的输出
>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])
错误编号 1 不允许使用发电机
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
类型错误:无法序列化生成器类型的对象。
我发现在仍然使用生成器的情况下无法解决这个问题。解决这个问题会很棒!因为这对于常规 pandas.
非常有效
错误号 2 只有每个分区的第一个元素
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
这个有点笨,因为函数不会迭代(我知道)但会给出所需的格式,但每个分区的第一行只有 returns。
错误编号 3 函数调用在 100% 时挂起 cpu
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
tagged_list = []
for i, line in enumerate(df[corp]):
tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_list.append(tagged)
return tagged_list
据我所知,在循环外重构 return 时,此函数挂起在 dask 客户端中构建内存,我的 CPU 利用率达到 100%,但未计算任何任务。请记住,我以相同的方式调用函数。
Pandas 解决方案
def tag_corp(corp,tag):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))
tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]
列出 comp 我还没有对这个解决方案进行时间测试
其他Pandas解决方案
tagged_document = list(read_corpus_tag_sub(df))
这个解决方案会持续好几个小时。但是,完成后我没有足够的内存来处理这件事。
结论(?)
我现在感觉超级迷茫。这是我看过的线程列表。我承认我是 dask 的新手,我刚刚花了这么多时间,我觉得我在做一件傻事。
我对 Dask 不熟悉 APIs/limitations,但总的来说:
如果您可以将数据作为(单词、标签)元组进行迭代——甚至忽略 Doc2Vec
/TaggedDocument
步骤——那么 Dask 端将得到处理,并将这些元组转换为 TaggedDocument
实例应该是微不足道的
一般来说,对于大型数据集,您不想(并且可能没有足够的 RAM)将整个数据集实例化为内存中的 list
- 所以您的尝试涉及list()
或 .append()
可能在一定程度上工作,但会耗尽本地内存(导致严重交换)and/or 只是没有到达数据的末尾。
大型数据集的首选方法是创建一个可迭代对象,每次要求它迭代数据时(因为 Doc2Vec
训练需要多次传递),可以提供每个项目反过来——但从不将整个数据集读入 in-memory 对象。
关于此模式的一篇不错的博文是:Data streaming in Python: generators, iterators, iterables
鉴于您显示的代码,我怀疑适合您的方法可能是:
from gensim.utils import simple_preprocess
class MyDataframeCorpus(object):
def __init__(self, source_df, text_col, tag_col):
self.source_df = source_df
self.text_col = text_col
self.tag_col = tag_col
def __iter__(self):
for i, row in self.source_df.iterrows():
yield TaggedDocument(words=simple_preprocess(row[self.text_col]),
tags=[row[self.tag_col]])
corpus_for_doc2vec = MyDataframeCorpus(df, 'claim_txt', 'claim_no')
好的,你已经接近这个代码了
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
但是正如您所见,生成生成器对 Dask 不是很有帮助。相反,你可以让你的函数 return 成为一个系列
def myfunc(df, *args, **kwargs):
output = []
for i, line in enumerate(df["my_series"])
result = ...
output.append([])
return pd.Series(output)
或者,您可能只使用 df.apply
方法,该方法采用将单行转换为单行的函数。
您可能还想切换到 Dask Bag,它确实比 Pandas/Dask DataFrame 更自然地处理列表和生成器之类的事情。
简介
目前我正在尝试将 dask 与 gensim 结合使用来进行 NLP 文档计算,但在将我的语料库转换为“TaggedDocument”时我遇到了问题运行。
因为我尝试了很多不同的方法来解决这个问题,所以我将列出我的尝试。
处理这个问题的每一次尝试都会遇到略有不同的困难。
首先给出一些初始信息。
数据
df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)
claim_no claim_txt I CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0
期望的输出
>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])
错误编号 1 不允许使用发电机
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
类型错误:无法序列化生成器类型的对象。
我发现在仍然使用生成器的情况下无法解决这个问题。解决这个问题会很棒!因为这对于常规 pandas.
非常有效错误号 2 只有每个分区的第一个元素
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()
这个有点笨,因为函数不会迭代(我知道)但会给出所需的格式,但每个分区的第一行只有 returns。
错误编号 3 函数调用在 100% 时挂起 cpu
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
tagged_list = []
for i, line in enumerate(df[corp]):
tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_list.append(tagged)
return tagged_list
据我所知,在循环外重构 return 时,此函数挂起在 dask 客户端中构建内存,我的 CPU 利用率达到 100%,但未计算任何任务。请记住,我以相同的方式调用函数。
Pandas 解决方案
def tag_corp(corp,tag):
return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))
tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]
列出 comp 我还没有对这个解决方案进行时间测试
其他Pandas解决方案
tagged_document = list(read_corpus_tag_sub(df))
这个解决方案会持续好几个小时。但是,完成后我没有足够的内存来处理这件事。
结论(?)
我现在感觉超级迷茫。这是我看过的线程列表。我承认我是 dask 的新手,我刚刚花了这么多时间,我觉得我在做一件傻事。
我对 Dask 不熟悉 APIs/limitations,但总的来说:
如果您可以将数据作为(单词、标签)元组进行迭代——甚至忽略
Doc2Vec
/TaggedDocument
步骤——那么 Dask 端将得到处理,并将这些元组转换为TaggedDocument
实例应该是微不足道的一般来说,对于大型数据集,您不想(并且可能没有足够的 RAM)将整个数据集实例化为内存中的
list
- 所以您的尝试涉及list()
或.append()
可能在一定程度上工作,但会耗尽本地内存(导致严重交换)and/or 只是没有到达数据的末尾。
大型数据集的首选方法是创建一个可迭代对象,每次要求它迭代数据时(因为 Doc2Vec
训练需要多次传递),可以提供每个项目反过来——但从不将整个数据集读入 in-memory 对象。
关于此模式的一篇不错的博文是:Data streaming in Python: generators, iterators, iterables
鉴于您显示的代码,我怀疑适合您的方法可能是:
from gensim.utils import simple_preprocess
class MyDataframeCorpus(object):
def __init__(self, source_df, text_col, tag_col):
self.source_df = source_df
self.text_col = text_col
self.tag_col = tag_col
def __iter__(self):
for i, row in self.source_df.iterrows():
yield TaggedDocument(words=simple_preprocess(row[self.text_col]),
tags=[row[self.tag_col]])
corpus_for_doc2vec = MyDataframeCorpus(df, 'claim_txt', 'claim_no')
好的,你已经接近这个代码了
def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
for i, line in enumerate(df[corp]):
yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
但是正如您所见,生成生成器对 Dask 不是很有帮助。相反,你可以让你的函数 return 成为一个系列
def myfunc(df, *args, **kwargs):
output = []
for i, line in enumerate(df["my_series"])
result = ...
output.append([])
return pd.Series(output)
或者,您可能只使用 df.apply
方法,该方法采用将单行转换为单行的函数。
您可能还想切换到 Dask Bag,它确实比 Pandas/Dask DataFrame 更自然地处理列表和生成器之类的事情。