使用 PySpark 将词汇元素与 LDA 模型的索引相匹配

Matching vocabulary elements to indices from LDA Model using PySpark

我想从 .describeTopics() 输出中获取 Spark LDA 模型的术语索引,并将它们与计数向量化器词汇表中的适当术语相匹配。这里是摩擦点:

terms = ['fuzzy', 'wuzzy', 'bear', 'seashells', 'chuck', 'wood', 'big', 'black', 'woodchuck', 'sell', 'hair', 'rug', 'sat', 'seashore', 'much', 'sells', 'many']
+-----+--------------+------------------------------------------------------------------------------------+
|topic|termIndices   |termWeights                                                                         |
+-----+--------------+------------------------------------------------------------------------------------+
|0    |[6, 16, 4, 13]|[0.07759153026889895, 0.07456018590515792, 0.06590443764744822, 0.06529979905841589]|
|1    |[0, 8, 1, 12] |[0.08460924078697102, 0.06935412981755526, 0.06803462316387827, 0.06505150660960128]|
|2    |[8, 14, 5, 3] |[0.07473487407268035, 0.06999332154185754, 0.06923579179113146, 0.06673236538997057]|
|3    |[2, 15, 10, 9]|[0.07990489772171691, 0.07352818255574894, 0.0725564301639141, 0.0705481456715216]  |
+-----+--------------+------------------------------------------------------------------------------------+

我想要的输出是带有数组列 terms 的上述 Dataframe,其中包含基于 termIndices.

的适当术语

这里是设置问题的代码:

    from pyspark.sql.functions import udf
    from pyspark.ml.feature import CountVectorizer, StopWordsRemover, RegexTokenizer, NGram
    from pyspark.ml import Pipeline
    from pyspark.ml.clustering import LDA
    import numpy as np
    
    text_df = spark.createDataFrame([
        (0, "How much WOOD could a woodchuck chuck if a woodchuck could chuck wood?"),
        (1, "She sells SEASHELLS by the seashore. How many seashells did she sell?"),
        (2, "Fuzzy Wuzzy was a bear. Fuzzy Wuzzy had no hair. Fuzzy Wuzzy wasn't very fuzzy, was he?"),
        (3, "A BIG BLACK bear sat on a big black rug.")
        ], ['id','text'])
        
    # Arguments to be passed to functions    
    input_data = text_df
    text_col = "text"
    n_topics = 4
    n_gram = 1
    min_doc_freq = 0.1
    max_doc_freq = 0.9
    
    # Model pipeline; RegexTokenizer allows us to tokenize on the regex, forgoing an explicit symbol removal step
    tokenizer = RegexTokenizer(inputCol=text_col, outputCol="tokens", pattern=r"[\s{2,}&;!\.\(\)-<>/,\?]+")
    stopwords = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
    ngrams = NGram(inputCol = "tokens_clean", n=n_gram, outputCol = "ngram")
    count_vec = CountVectorizer(inputCol="ngram", outputCol="features", minDF = min_doc_freq, maxDF = max_doc_freq)
    lda = LDA(k=n_topics, seed=477)
    pipeline = Pipeline(stages=[tokenizer, stopwords, ngrams, count_vec, lda])

    # Fitting and transforming
    model = pipeline.fit(input_data)

这是我尝试过的:

    # This section to experiment with matching vocabulary indices to words in terms
    topics = model.stages[-1].describeTopics(4) # This yields the desired topic table above
    terms = model.stages[-2].vocabulary # This yields the vocabulary above
    
    # Defining a UDF to try and match indices to terms
    @udf
    def indices_to_terms(indices):
        terms_subset = [terms[index] for index in indices]
        return(np.array(terms_subset))
    # Attempting to use UDF to add a terms column
    topics = (
        topics
       .withColumn("terms", indices_to_terms(F.col("termIndices")))
    )
    topics.show()

当我 运行 这段代码时,我实际上并没有收到错误,但它没有显示任何内容。也许 UDF 不是正确的方法。我如何将 termIndices 中的索引与 terms 中的模型词汇表相匹配,并使其成为一个数组列?

解决方案只是更仔细地定义我的 UDF。下面的代码解决了我的问题。

    # This section to experiment with matching vocabulary indices to words in terms
    topics = model.stages[-1].describeTopics(4)
    terms = model.stages[-2].vocabulary
    
    # Pandas function for matching indices to terms in vocabulary
    def indices_to_terms(indices, terms=terms):
        terms_subset = [terms[index] for index in indices]
        return terms_subset
    # Defining Spark UDF from above function
    udf_indices_to_terms = F.udf(indices_to_terms, ArrayType(StringType()))

    topics = (
       topics
       .withColumn("terms", udf_indices_to_terms(F.col("termIndices")))
    )

在管道包装器之外(没有 pandas)仅进行 spark 操作的简化解决方案:

from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as f

...您的 CountVectorizer 实现

# get (tokenized) vocabulary mapping to trace back topics later
terms = cv_model.vocabulary

...您的 LDA 模型实现

# get topics
topics_df = model.describeTopics()
# define udf to map term indices to original token from count vectorizer vocabulary
@f.udf(returnType=ArrayType(StringType()))
def indices_to_terms(indices, terms=terms):
    terms_mapping = [terms[index] for index in indices]
    return terms_mapping

# apply udf to generate topic idx to token mapping
topics_df = topics_df.withColumn("termValues", indices_to_terms(f.col("termIndices")))
print("The topics described by their top-weighted terms:")
topics_df.show(truncate=False)

一个简单、优雅和高效的解决方案,除了我找不到的任何内置于 spark 中的东西......感谢@Chris Aguilar 为我节省了几个小时的 headbanging heuristics :)