Pyspark mllib LDA error: Object cannot be cast to java.util.List

Pyspark mllib LDA error: Object cannot be cast to java.util.List

我目前正在尝试在 spark 集群上执行 LDA。我有一个 RDD

>>> myRdd.take(2)
[(218603, [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0]), (95680, [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0])]

但是打电话

model = LDA.train(myRdd, k=5, seed=42)

给出了工人的以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5874.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5874.0): java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to java.util.List

除了显而易见的错误之外,我不知道如何解释这个错误,所以任何建议都将不胜感激;关于 mllib 的 LDA 的文档相当稀疏

我从以下过程中获取 RDD,从具有列 "doc_label" 和 "terms"

的数据帧 document_instances 开始
hashingTF = HashingTF(inputCol="terms", outputCol="term_frequencies", numFeatures=10)
tf_matrix = hashingTF.transform(document_instances)
myRdd = tf_matrix.select("doc_label", "term_frequencies").rdd

直接用这个报同样的错误。现在,这是在 pyspark.ml.feature 中使用 HashingTF,所以我怀疑 mllib 中的 Vector 与 ml 中的 Vector 之间的差异可能会导致冲突,但直接使用 Vector.fromML() 函数给出了与使用

相同的错误
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies.toArray().tolist()))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies.toArray()))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, Vectors.fromML(old_row.term_frequencies)))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies))

所以,事实证明,spark文档在说"RDD of documents, which are tuples of document IDs and term (word) count vectors."时有点误导也许我误解了,但是当将元组更改为列表时,这个错误似乎消失了(虽然它似乎有已被其他错误取代)

改变

myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies))

myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    [old_row.term, Vectors.fromML(old_row.term_frequencies)])

与他们的示例代码比较后,似乎可以缓解所提出的问题

http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA