访问 Spark 2.0 中的向量列时出现 MatchError
MatchError while accessing vector column in Spark 2.0
我正在尝试在 JSON 文件上创建 LDA 模型。
使用 JSON 文件创建 spark 上下文:
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.getOrCreate()
val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")
显示 df
应该显示 DataFrame
display(df)
标记文本
import org.apache.spark.ml.feature.RegexTokenizer
// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setPattern("[\W_]+")
.setMinTokenLength(4) // Filter away tokens with length < 4
.setInputCol("text")
.setOutputCol("tokens")
// Tokenize document
val tokenized_df = tokenizer.transform(df)
这应该显示 tokenized_df
display(tokenized_df)
得到stopwords
%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords
可选:将停用词复制到 tmp 文件夹
%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords
收集所有 stopwords
val stopwords = sc.textFile("/tmp/stopwords").collect()
过滤掉 stopwords
import org.apache.spark.ml.feature.StopWordsRemover
// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional
.setInputCol("tokens")
.setOutputCol("filtered")
// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)
显示过滤后的 df
应该验证 stopwords
已被删除
display(filtered_df)
向量化词的出现频率
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.CountVectorizer
// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.fit(filtered_df)
验证 vectorizer
vectorizer.transform(filtered_df)
.select("id", "text","features","filtered").show()
在此之后,我发现在 LDA 中拟合此 vectorizer
时出现问题。我认为 CountVectorizer
的问题是给出稀疏向量,但 LDA 需要密集向量。仍在尝试找出问题所在。
这是地图无法转换的例外情况。
import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
display(ldaDF)
异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
有一个 LDA 的工作示例,它没有引发任何问题
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF
val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) }
val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)
唯一的区别是在第二个片段中我们有一个密集矩阵。
这与稀疏性无关。由于 Spark 2.0.0 ML Transformers
不再生成 o.a.s.mllib.linalg.VectorUDT
而是 o.a.s.ml.linalg.VectorUDT
并且在本地映射到 o.a.s.ml.linalg.Vector
的子类。这些与旧的 MLLib API 不兼容,旧的 MLLib API 在 Spark 2.0.0 中正走向弃用。
您可以使用 Vectors.fromML
:
在与 "old" 之间进行转换
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}
OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))
但如果您已经使用 ML 转换器,则使用 ML
LDA 实现更有意义。
为方便起见,您可以使用隐式转换:
import scala.languageFeature.implicitConversions
object VectorConversions {
import org.apache.spark.mllib.{linalg => mllib}
import org.apache.spark.ml.{linalg => ml}
implicit def toNewVector(v: mllib.Vector) = v.asML
implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}
伙计们,解决方法很简单..在下面找到
//import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.linalg.Vector
我改变了:
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
至:
val ldaDF = countVectors.map { case Row(docId: String, features: MLVector) =>
(docId.toLong, Vectors.fromML(features)) }
而且效果非常好!它与@zero323 所写的一致。
进口清单:
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
我正在尝试在 JSON 文件上创建 LDA 模型。
使用 JSON 文件创建 spark 上下文:
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.some.config.option", "config-value")
.getOrCreate()
val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")
显示 df
应该显示 DataFrame
display(df)
标记文本
import org.apache.spark.ml.feature.RegexTokenizer
// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setPattern("[\W_]+")
.setMinTokenLength(4) // Filter away tokens with length < 4
.setInputCol("text")
.setOutputCol("tokens")
// Tokenize document
val tokenized_df = tokenizer.transform(df)
这应该显示 tokenized_df
display(tokenized_df)
得到stopwords
%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords
可选:将停用词复制到 tmp 文件夹
%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords
收集所有 stopwords
val stopwords = sc.textFile("/tmp/stopwords").collect()
过滤掉 stopwords
import org.apache.spark.ml.feature.StopWordsRemover
// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional
.setInputCol("tokens")
.setOutputCol("filtered")
// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)
显示过滤后的 df
应该验证 stopwords
已被删除
display(filtered_df)
向量化词的出现频率
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.CountVectorizer
// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.fit(filtered_df)
验证 vectorizer
vectorizer.transform(filtered_df)
.select("id", "text","features","filtered").show()
在此之后,我发现在 LDA 中拟合此 vectorizer
时出现问题。我认为 CountVectorizer
的问题是给出稀疏向量,但 LDA 需要密集向量。仍在尝试找出问题所在。
这是地图无法转换的例外情况。
import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
display(ldaDF)
异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
有一个 LDA 的工作示例,它没有引发任何问题
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF
val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) }
val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)
唯一的区别是在第二个片段中我们有一个密集矩阵。
这与稀疏性无关。由于 Spark 2.0.0 ML Transformers
不再生成 o.a.s.mllib.linalg.VectorUDT
而是 o.a.s.ml.linalg.VectorUDT
并且在本地映射到 o.a.s.ml.linalg.Vector
的子类。这些与旧的 MLLib API 不兼容,旧的 MLLib API 在 Spark 2.0.0 中正走向弃用。
您可以使用 Vectors.fromML
:
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}
OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))
但如果您已经使用 ML 转换器,则使用 ML
LDA 实现更有意义。
为方便起见,您可以使用隐式转换:
import scala.languageFeature.implicitConversions
object VectorConversions {
import org.apache.spark.mllib.{linalg => mllib}
import org.apache.spark.ml.{linalg => ml}
implicit def toNewVector(v: mllib.Vector) = v.asML
implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}
伙计们,解决方法很简单..在下面找到
//import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.linalg.Vector
我改变了:
val ldaDF = countVectors.map {
case Row(id: String, countVector: Vector) => (id, countVector)
}
至:
val ldaDF = countVectors.map { case Row(docId: String, features: MLVector) =>
(docId.toLong, Vectors.fromML(features)) }
而且效果非常好!它与@zero323 所写的一致。
进口清单:
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}