使用 PySpark 将词汇元素与 LDA 模型的索引相匹配
Matching vocabulary elements to indices from LDA Model using PySpark
我想从 .describeTopics()
输出中获取 Spark LDA 模型的术语索引,并将它们与计数向量化器词汇表中的适当术语相匹配。这里是摩擦点:
terms = ['fuzzy', 'wuzzy', 'bear', 'seashells', 'chuck', 'wood', 'big', 'black', 'woodchuck', 'sell', 'hair', 'rug', 'sat', 'seashore', 'much', 'sells', 'many']
+-----+--------------+------------------------------------------------------------------------------------+
|topic|termIndices |termWeights |
+-----+--------------+------------------------------------------------------------------------------------+
|0 |[6, 16, 4, 13]|[0.07759153026889895, 0.07456018590515792, 0.06590443764744822, 0.06529979905841589]|
|1 |[0, 8, 1, 12] |[0.08460924078697102, 0.06935412981755526, 0.06803462316387827, 0.06505150660960128]|
|2 |[8, 14, 5, 3] |[0.07473487407268035, 0.06999332154185754, 0.06923579179113146, 0.06673236538997057]|
|3 |[2, 15, 10, 9]|[0.07990489772171691, 0.07352818255574894, 0.0725564301639141, 0.0705481456715216] |
+-----+--------------+------------------------------------------------------------------------------------+
我想要的输出是带有数组列 terms
的上述 Dataframe,其中包含基于 termIndices
.
的适当术语
这里是设置问题的代码:
from pyspark.sql.functions import udf
from pyspark.ml.feature import CountVectorizer, StopWordsRemover, RegexTokenizer, NGram
from pyspark.ml import Pipeline
from pyspark.ml.clustering import LDA
import numpy as np
text_df = spark.createDataFrame([
(0, "How much WOOD could a woodchuck chuck if a woodchuck could chuck wood?"),
(1, "She sells SEASHELLS by the seashore. How many seashells did she sell?"),
(2, "Fuzzy Wuzzy was a bear. Fuzzy Wuzzy had no hair. Fuzzy Wuzzy wasn't very fuzzy, was he?"),
(3, "A BIG BLACK bear sat on a big black rug.")
], ['id','text'])
# Arguments to be passed to functions
input_data = text_df
text_col = "text"
n_topics = 4
n_gram = 1
min_doc_freq = 0.1
max_doc_freq = 0.9
# Model pipeline; RegexTokenizer allows us to tokenize on the regex, forgoing an explicit symbol removal step
tokenizer = RegexTokenizer(inputCol=text_col, outputCol="tokens", pattern=r"[\s{2,}&;!\.\(\)-<>/,\?]+")
stopwords = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
ngrams = NGram(inputCol = "tokens_clean", n=n_gram, outputCol = "ngram")
count_vec = CountVectorizer(inputCol="ngram", outputCol="features", minDF = min_doc_freq, maxDF = max_doc_freq)
lda = LDA(k=n_topics, seed=477)
pipeline = Pipeline(stages=[tokenizer, stopwords, ngrams, count_vec, lda])
# Fitting and transforming
model = pipeline.fit(input_data)
这是我尝试过的:
# This section to experiment with matching vocabulary indices to words in terms
topics = model.stages[-1].describeTopics(4) # This yields the desired topic table above
terms = model.stages[-2].vocabulary # This yields the vocabulary above
# Defining a UDF to try and match indices to terms
@udf
def indices_to_terms(indices):
terms_subset = [terms[index] for index in indices]
return(np.array(terms_subset))
# Attempting to use UDF to add a terms column
topics = (
topics
.withColumn("terms", indices_to_terms(F.col("termIndices")))
)
topics.show()
当我 运行 这段代码时,我实际上并没有收到错误,但它没有显示任何内容。也许 UDF 不是正确的方法。我如何将 termIndices
中的索引与 terms
中的模型词汇表相匹配,并使其成为一个数组列?
解决方案只是更仔细地定义我的 UDF。下面的代码解决了我的问题。
# This section to experiment with matching vocabulary indices to words in terms
topics = model.stages[-1].describeTopics(4)
terms = model.stages[-2].vocabulary
# Pandas function for matching indices to terms in vocabulary
def indices_to_terms(indices, terms=terms):
terms_subset = [terms[index] for index in indices]
return terms_subset
# Defining Spark UDF from above function
udf_indices_to_terms = F.udf(indices_to_terms, ArrayType(StringType()))
topics = (
topics
.withColumn("terms", udf_indices_to_terms(F.col("termIndices")))
)
在管道包装器之外(没有 pandas)仅进行 spark 操作的简化解决方案:
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as f
...您的 CountVectorizer 实现
# get (tokenized) vocabulary mapping to trace back topics later
terms = cv_model.vocabulary
...您的 LDA 模型实现
# get topics
topics_df = model.describeTopics()
# define udf to map term indices to original token from count vectorizer vocabulary
@f.udf(returnType=ArrayType(StringType()))
def indices_to_terms(indices, terms=terms):
terms_mapping = [terms[index] for index in indices]
return terms_mapping
# apply udf to generate topic idx to token mapping
topics_df = topics_df.withColumn("termValues", indices_to_terms(f.col("termIndices")))
print("The topics described by their top-weighted terms:")
topics_df.show(truncate=False)
一个简单、优雅和高效的解决方案,除了我找不到的任何内置于 spark 中的东西......感谢@Chris Aguilar 为我节省了几个小时的 headbanging heuristics :)
我想从 .describeTopics()
输出中获取 Spark LDA 模型的术语索引,并将它们与计数向量化器词汇表中的适当术语相匹配。这里是摩擦点:
terms = ['fuzzy', 'wuzzy', 'bear', 'seashells', 'chuck', 'wood', 'big', 'black', 'woodchuck', 'sell', 'hair', 'rug', 'sat', 'seashore', 'much', 'sells', 'many']
+-----+--------------+------------------------------------------------------------------------------------+
|topic|termIndices |termWeights |
+-----+--------------+------------------------------------------------------------------------------------+
|0 |[6, 16, 4, 13]|[0.07759153026889895, 0.07456018590515792, 0.06590443764744822, 0.06529979905841589]|
|1 |[0, 8, 1, 12] |[0.08460924078697102, 0.06935412981755526, 0.06803462316387827, 0.06505150660960128]|
|2 |[8, 14, 5, 3] |[0.07473487407268035, 0.06999332154185754, 0.06923579179113146, 0.06673236538997057]|
|3 |[2, 15, 10, 9]|[0.07990489772171691, 0.07352818255574894, 0.0725564301639141, 0.0705481456715216] |
+-----+--------------+------------------------------------------------------------------------------------+
我想要的输出是带有数组列 terms
的上述 Dataframe,其中包含基于 termIndices
.
这里是设置问题的代码:
from pyspark.sql.functions import udf
from pyspark.ml.feature import CountVectorizer, StopWordsRemover, RegexTokenizer, NGram
from pyspark.ml import Pipeline
from pyspark.ml.clustering import LDA
import numpy as np
text_df = spark.createDataFrame([
(0, "How much WOOD could a woodchuck chuck if a woodchuck could chuck wood?"),
(1, "She sells SEASHELLS by the seashore. How many seashells did she sell?"),
(2, "Fuzzy Wuzzy was a bear. Fuzzy Wuzzy had no hair. Fuzzy Wuzzy wasn't very fuzzy, was he?"),
(3, "A BIG BLACK bear sat on a big black rug.")
], ['id','text'])
# Arguments to be passed to functions
input_data = text_df
text_col = "text"
n_topics = 4
n_gram = 1
min_doc_freq = 0.1
max_doc_freq = 0.9
# Model pipeline; RegexTokenizer allows us to tokenize on the regex, forgoing an explicit symbol removal step
tokenizer = RegexTokenizer(inputCol=text_col, outputCol="tokens", pattern=r"[\s{2,}&;!\.\(\)-<>/,\?]+")
stopwords = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
ngrams = NGram(inputCol = "tokens_clean", n=n_gram, outputCol = "ngram")
count_vec = CountVectorizer(inputCol="ngram", outputCol="features", minDF = min_doc_freq, maxDF = max_doc_freq)
lda = LDA(k=n_topics, seed=477)
pipeline = Pipeline(stages=[tokenizer, stopwords, ngrams, count_vec, lda])
# Fitting and transforming
model = pipeline.fit(input_data)
这是我尝试过的:
# This section to experiment with matching vocabulary indices to words in terms
topics = model.stages[-1].describeTopics(4) # This yields the desired topic table above
terms = model.stages[-2].vocabulary # This yields the vocabulary above
# Defining a UDF to try and match indices to terms
@udf
def indices_to_terms(indices):
terms_subset = [terms[index] for index in indices]
return(np.array(terms_subset))
# Attempting to use UDF to add a terms column
topics = (
topics
.withColumn("terms", indices_to_terms(F.col("termIndices")))
)
topics.show()
当我 运行 这段代码时,我实际上并没有收到错误,但它没有显示任何内容。也许 UDF 不是正确的方法。我如何将 termIndices
中的索引与 terms
中的模型词汇表相匹配,并使其成为一个数组列?
解决方案只是更仔细地定义我的 UDF。下面的代码解决了我的问题。
# This section to experiment with matching vocabulary indices to words in terms
topics = model.stages[-1].describeTopics(4)
terms = model.stages[-2].vocabulary
# Pandas function for matching indices to terms in vocabulary
def indices_to_terms(indices, terms=terms):
terms_subset = [terms[index] for index in indices]
return terms_subset
# Defining Spark UDF from above function
udf_indices_to_terms = F.udf(indices_to_terms, ArrayType(StringType()))
topics = (
topics
.withColumn("terms", udf_indices_to_terms(F.col("termIndices")))
)
在管道包装器之外(没有 pandas)仅进行 spark 操作的简化解决方案:
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as f
...您的 CountVectorizer 实现
# get (tokenized) vocabulary mapping to trace back topics later
terms = cv_model.vocabulary
...您的 LDA 模型实现
# get topics
topics_df = model.describeTopics()
# define udf to map term indices to original token from count vectorizer vocabulary
@f.udf(returnType=ArrayType(StringType()))
def indices_to_terms(indices, terms=terms):
terms_mapping = [terms[index] for index in indices]
return terms_mapping
# apply udf to generate topic idx to token mapping
topics_df = topics_df.withColumn("termValues", indices_to_terms(f.col("termIndices")))
print("The topics described by their top-weighted terms:")
topics_df.show(truncate=False)
一个简单、优雅和高效的解决方案,除了我找不到的任何内置于 spark 中的东西......感谢@Chris Aguilar 为我节省了几个小时的 headbanging heuristics :)