Spark 2.0 ALS Recommendation 如何向用户推荐
Spark 2.0 ALS Recommendation how to recommend to a user
我已遵循 link 中给出的指南
http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
但这已经过时了,因为它使用了 spark Mlib RDD 方法。新的 Spark 2.0 具有 DataFrame 方法。
现在我的问题是我有更新的代码
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
val predictions = model.transform(test)
现在问题来了,在旧代码中得到的模型是一个MatrixFactorizationModel,现在它有自己的模型(ALSModel)
在 MatrixFactorizationModel 中你可以直接做
val recommendations = bestModel.get
.predict(userID)
这将给出用户喜欢它们的概率最高的产品列表。
但是现在没有.predict方法。任何想法如何在给定用户 ID
的情况下推荐产品列表
在模型上使用 transform
方法:
import spark.implicits._
val dataFrameToPredict = sparkContext.parallelize(Seq((111, 222)))
.toDF("userId", "productId")
val predictionsOfProducts = model.transform (dataFrameToPredict)
有一个 jira ticket 可以实现 recommend(User|Product) 方法,但它还没有在默认分支上
现在你有了带有用户得分的 DataFrame
您可以简单地使用 orderBy 和 limit 来显示 N 个推荐产品:
// where is for case when we have big DataFrame with many users
model.transform (dataFrameToPredict.where('userId === givenUserId))
.select ('productId, 'prediction)
.orderBy('prediction.desc)
.limit(N)
.map { case Row (productId: Int, prediction: Double) => (productId, prediction) }
.collect()
DataFrame dataFrameToPredict 可以是一些大型用户-产品 DataFrame,例如所有用户 x 所有产品
这是我通过 spark.ml
:
为特定用户获取推荐的方法
import com.github.fommil.netlib.BLAS.{getInstance => blas}
userFactors.lookup(userId).headOption.fold(Map.empty[String, Float]) { user =>
val ratings = itemFactors.map { case (id, features) =>
val rating = blas.sdot(features.length, user, 1, features, 1)
(id, rating)
}
ratings.sortBy(_._2).take(numResults).toMap
}
在我的例子中 userFactors
和 itemFactors
都是 RDD[(String, Array[Float])]
但你应该可以用 DataFrames 做类似的事情。
Spark 中的 ALS Model 包含以下有用的方法:
recommendForAllItems(int numUsers)
Returns 为所有项目的每个项目推荐的前 numUsers 用户。
recommendForAllUsers(int numItems)
Returns 为每位用户推荐的前 numItems 个项目,适用于所有用户。
recommendForItemSubset(Dataset<?> dataset, int numUsers)
Returns top numUsers 用户为输入数据集中的每个项目 id 推荐。
recommendForUserSubset(Dataset<?> dataset, int numItems)
Returns 为输入数据集中每个用户 ID 推荐的前 numItems 个项目。
例如Python
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import explode
alsEstimator = ALS()
(alsEstimator.setRank(1)
.setUserCol("user_id")
.setItemCol("product_id")
.setRatingCol("rating")
.setMaxIter(20)
.setColdStartStrategy("drop"))
alsModel = alsEstimator.fit(productRatings)
recommendForSubsetDF = alsModel.recommendForUserSubset(TargetUsers, 40)
recommendationsDF = (recommendForSubsetDF
.select("user_id", explode("recommendations")
.alias("recommendation"))
.select("user_id", "recommendation.*")
)
display(recommendationsDF)
例如斯卡拉:
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions.explode
val alsEstimator = new ALS().setRank(1)
.setUserCol("user_id")
.setItemCol("product_id")
.setRatingCol("rating")
.setMaxIter(20)
.setColdStartStrategy("drop")
val alsModel = alsEstimator.fit(productRatings)
val recommendForSubsetDF = alsModel.recommendForUserSubset(sampleTargetUsers, 40)
val recommendationsDF = recommendForSubsetDF
.select($"user_id", explode($"recommendations").alias("recommendation"))
.select($"user_id", $"recommendation.*")
display(recommendationsDF)
我已遵循 link 中给出的指南 http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
但这已经过时了,因为它使用了 spark Mlib RDD 方法。新的 Spark 2.0 具有 DataFrame 方法。 现在我的问题是我有更新的代码
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
val predictions = model.transform(test)
现在问题来了,在旧代码中得到的模型是一个MatrixFactorizationModel,现在它有自己的模型(ALSModel)
在 MatrixFactorizationModel 中你可以直接做
val recommendations = bestModel.get
.predict(userID)
这将给出用户喜欢它们的概率最高的产品列表。
但是现在没有.predict方法。任何想法如何在给定用户 ID
的情况下推荐产品列表在模型上使用 transform
方法:
import spark.implicits._
val dataFrameToPredict = sparkContext.parallelize(Seq((111, 222)))
.toDF("userId", "productId")
val predictionsOfProducts = model.transform (dataFrameToPredict)
有一个 jira ticket 可以实现 recommend(User|Product) 方法,但它还没有在默认分支上
现在你有了带有用户得分的 DataFrame
您可以简单地使用 orderBy 和 limit 来显示 N 个推荐产品:
// where is for case when we have big DataFrame with many users
model.transform (dataFrameToPredict.where('userId === givenUserId))
.select ('productId, 'prediction)
.orderBy('prediction.desc)
.limit(N)
.map { case Row (productId: Int, prediction: Double) => (productId, prediction) }
.collect()
DataFrame dataFrameToPredict 可以是一些大型用户-产品 DataFrame,例如所有用户 x 所有产品
这是我通过 spark.ml
:
import com.github.fommil.netlib.BLAS.{getInstance => blas}
userFactors.lookup(userId).headOption.fold(Map.empty[String, Float]) { user =>
val ratings = itemFactors.map { case (id, features) =>
val rating = blas.sdot(features.length, user, 1, features, 1)
(id, rating)
}
ratings.sortBy(_._2).take(numResults).toMap
}
在我的例子中 userFactors
和 itemFactors
都是 RDD[(String, Array[Float])]
但你应该可以用 DataFrames 做类似的事情。
Spark 中的 ALS Model 包含以下有用的方法:
recommendForAllItems(int numUsers)
Returns 为所有项目的每个项目推荐的前 numUsers 用户。
recommendForAllUsers(int numItems)
Returns 为每位用户推荐的前 numItems 个项目,适用于所有用户。
recommendForItemSubset(Dataset<?> dataset, int numUsers)
Returns top numUsers 用户为输入数据集中的每个项目 id 推荐。
recommendForUserSubset(Dataset<?> dataset, int numItems)
Returns 为输入数据集中每个用户 ID 推荐的前 numItems 个项目。
例如Python
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import explode
alsEstimator = ALS()
(alsEstimator.setRank(1)
.setUserCol("user_id")
.setItemCol("product_id")
.setRatingCol("rating")
.setMaxIter(20)
.setColdStartStrategy("drop"))
alsModel = alsEstimator.fit(productRatings)
recommendForSubsetDF = alsModel.recommendForUserSubset(TargetUsers, 40)
recommendationsDF = (recommendForSubsetDF
.select("user_id", explode("recommendations")
.alias("recommendation"))
.select("user_id", "recommendation.*")
)
display(recommendationsDF)
例如斯卡拉:
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions.explode
val alsEstimator = new ALS().setRank(1)
.setUserCol("user_id")
.setItemCol("product_id")
.setRatingCol("rating")
.setMaxIter(20)
.setColdStartStrategy("drop")
val alsModel = alsEstimator.fit(productRatings)
val recommendForSubsetDF = alsModel.recommendForUserSubset(sampleTargetUsers, 40)
val recommendationsDF = recommendForSubsetDF
.select($"user_id", explode($"recommendations").alias("recommendation"))
.select($"user_id", $"recommendation.*")
display(recommendationsDF)