Spark - Prediction.io - scala.MatchError: null
Spark - Prediction.io - scala.MatchError: null
我正在为 prediction.io 制作模板,但我 运行 遇到了 Spark 的问题。
我一直收到 scala.MatchError
错误:full gist here
scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:831)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:66)
at org.template.prediction.ALSAlgorithm$$anonfun$predict$$anonfun$apply.apply(ALSAlgorithm.scala:86)
at org.template.prediction.ALSAlgorithm$$anonfun$predict$$anonfun$apply.apply(ALSAlgorithm.scala:79)
at scala.Option.map(Option.scala:145)
at org.template.prediction.ALSAlgorithm$$anonfun$predict.apply(ALSAlgorithm.scala:79)
at org.template.prediction.ALSAlgorithm$$anonfun$predict.apply(ALSAlgorithm.scala:78)
val usersWithCounts =
ratingsRDD
.map(r => (r.user, (1, Seq[Rating](Rating(r.user, r.item, r.rating)))))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2.union(v2._2)))
.filter(_._2._1 >= evalK)
// create evalK folds of ratings
(0 until evalK).map { idx =>
// start by getting this fold's ratings for each user
val fold = usersWithCounts
.map { userKV =>
val userRatings = userKV._2._2.zipWithIndex
val trainingRatings = userRatings.filter(_._2 % evalK != idx).map(_._1)
val testingRatings = userRatings.filter(_._2 % evalK == idx).map(_._1)
(trainingRatings, testingRatings) // split the user's ratings into a training set and a testing set
}
.reduce((l, r) => (l._1.union(r._1), l._2.union(r._2))) // merge all the testing and training sets into a single testing and training set
val testingSet = fold._2.map {
r => (new Query(r.user, r.item), new ActualResult(r.rating))
}
(
new TrainingData(sc.parallelize(fold._1)),
new EmptyEvaluationInfo(),
sc.parallelize(testingSet)
)
}
为了进行评估,我需要将评分分成训练组和测试组。为了确保每个用户都包含在训练中,我将所有用户的评分分组在一起,然后对每个用户进行拆分,然后将拆分连接在一起。
也许有更好的方法?
该错误意味着 MLlib MatrixFactorizationModel 的 userFeatures 不包含用户 ID(例如,如果用户不在训练数据中)。 MLlib 在查找后不检查它(使用 .head):
https://github.com/apache/spark/blob/v1.2.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L66
如果是这种情况,要进行调试,您可以实现 model.predict() 的修改版本来检查模型中是否存在 userId/itemId 而不是调用默认版本:
val itemScore = model.predict(userInt, itemInt)
改为使用 .headOption:
val itemScore = model.userFeatures.lookup(userInt).headOption.map { userFeature =>
model.productFeatures.lookup(itemInt).headOption.map { productFeature =>
val userVector = new DoubleMatrix(userFeature)
val productVector = new DoubleMatrix(productFeature)
userVector.dot(productVector)
}.getOrElse{
logger.info(s"No itemFeature for item ${query.item}.")
0.0 // return default score
}
}.getOrElse{
logger.info(s"No userFeature for user ${query.user}.")
0.0 // return default score
}
我正在为 prediction.io 制作模板,但我 运行 遇到了 Spark 的问题。
我一直收到 scala.MatchError
错误:full gist here
scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:831)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:66)
at org.template.prediction.ALSAlgorithm$$anonfun$predict$$anonfun$apply.apply(ALSAlgorithm.scala:86)
at org.template.prediction.ALSAlgorithm$$anonfun$predict$$anonfun$apply.apply(ALSAlgorithm.scala:79)
at scala.Option.map(Option.scala:145)
at org.template.prediction.ALSAlgorithm$$anonfun$predict.apply(ALSAlgorithm.scala:79)
at org.template.prediction.ALSAlgorithm$$anonfun$predict.apply(ALSAlgorithm.scala:78)
val usersWithCounts =
ratingsRDD
.map(r => (r.user, (1, Seq[Rating](Rating(r.user, r.item, r.rating)))))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2.union(v2._2)))
.filter(_._2._1 >= evalK)
// create evalK folds of ratings
(0 until evalK).map { idx =>
// start by getting this fold's ratings for each user
val fold = usersWithCounts
.map { userKV =>
val userRatings = userKV._2._2.zipWithIndex
val trainingRatings = userRatings.filter(_._2 % evalK != idx).map(_._1)
val testingRatings = userRatings.filter(_._2 % evalK == idx).map(_._1)
(trainingRatings, testingRatings) // split the user's ratings into a training set and a testing set
}
.reduce((l, r) => (l._1.union(r._1), l._2.union(r._2))) // merge all the testing and training sets into a single testing and training set
val testingSet = fold._2.map {
r => (new Query(r.user, r.item), new ActualResult(r.rating))
}
(
new TrainingData(sc.parallelize(fold._1)),
new EmptyEvaluationInfo(),
sc.parallelize(testingSet)
)
}
为了进行评估,我需要将评分分成训练组和测试组。为了确保每个用户都包含在训练中,我将所有用户的评分分组在一起,然后对每个用户进行拆分,然后将拆分连接在一起。
也许有更好的方法?
该错误意味着 MLlib MatrixFactorizationModel 的 userFeatures 不包含用户 ID(例如,如果用户不在训练数据中)。 MLlib 在查找后不检查它(使用 .head): https://github.com/apache/spark/blob/v1.2.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L66
如果是这种情况,要进行调试,您可以实现 model.predict() 的修改版本来检查模型中是否存在 userId/itemId 而不是调用默认版本:
val itemScore = model.predict(userInt, itemInt)
改为使用 .headOption:
val itemScore = model.userFeatures.lookup(userInt).headOption.map { userFeature =>
model.productFeatures.lookup(itemInt).headOption.map { productFeature =>
val userVector = new DoubleMatrix(userFeature)
val productVector = new DoubleMatrix(productFeature)
userVector.dot(productVector)
}.getOrElse{
logger.info(s"No itemFeature for item ${query.item}.")
0.0 // return default score
}
}.getOrElse{
logger.info(s"No userFeature for user ${query.user}.")
0.0 // return default score
}