如何使用 scala 或 python 在 apache spark 中 运行 多线程作业?
How to run Multi threaded jobs in apache spark using scala or python?
我遇到了与 spark 并发相关的问题,这使我无法在生产中使用它,但我知道有办法解决它。我正在尝试 运行 为 700 万用户使用订单历史记录 10 亿个产品来激发 ALS。首先,我列出了不同用户的列表,然后 运行 在这些用户上循环以获得推荐,这是一个非常缓慢的过程,需要几天时间才能为所有用户获得推荐。我尝试使用笛卡尔用户和产品来一次获得所有推荐,但再次将其提供给 elasticsearch 我必须为每个用户过滤和排序记录,然后我才能将其提供给 elasticsearch 以供其他 API 使用。
所以请给我一个解决方案,它在这种用例中具有很好的可扩展性,并且可以在带有实时建议的生产中使用。
这是我在 scala 中的代码片段,可以让您了解我目前是如何解决问题的:
// buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
def recommend_for_user(user: Int): Unit = {
println("Recommendations for User ID: " + user);
// Product IDs which are not bought by user
val candidates = buys_values
.filter(x => x("customer_id").toString.toInt != user)
.map(x => x("product_id").toString.toInt)
.distinct().map((user, _))
// find 30 products with top rating
val recommendations = bestModel.get
.predict(candidates)
.takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))
var i = 1
var ESMap = Map[String, String]()
recommendations.foreach { r =>
ESMap += r.product.toString -> bitem_ids.value(r.product)
}
// push to elasticsearch with user as id
client.execute {
index into "recommendation" / "items" id user fields ESMap
}.await
// remove candidate RDD from memory
candidates.unpersist()
}
// iterate on each user to get recommendations for the user [slow process]
user_ids.foreach(recommend_for_user)
很明显,您程序中的瓶颈是搜索 candidates
。考虑到 Spark 架构,它会严重限制您的并行化能力,并通过为每个用户启动 Spark 作业来增加大量开销。
假设典型场景,有 700 万用户 和 10 亿产品 大多数时候你会预测整个产品范围减去用户已经购买的少数。至少在我看来,重要的问题是为什么还要费心过滤。即使你推荐以前买过的产品真的有害吗?
除非你有非常严格的要求,否则我会简单地忽略这个问题并使用 MatrixFactorizationModel.recommendProductsForUsers
,它几乎可以为你完成所有工作,不包括数据导出。之后您可以执行批量导出,一切顺利。
现在假设您有明确的禁止重复政策。假设典型用户只购买了相对少量的产品,您可以从为每个用户获取一组产品开始:
val userProdSet = buy_values
.map{case (user, product, _) => (user, product)}
.aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)
接下来您可以简单地映射 userProdSet
以获得预测:
// Number of predictions for each user
val nPred = 30;
userProdSet.map{case (user, prodSet) => {
val recommended = model
// Find recommendations for user
.recommendProducts(_, nPred + prodSet.size))
// Filter to remove already purchased
.filter(rating => !prodSet.contains(rating.product))
// Sort and limit
.sortBy(_.rating)
.reverse
.take(nPred)
(user, recommended)
}}
您可以通过使用可变集进行聚合和广播模型来进一步改进,但这是一般的想法。
如果 user_ids
中的用户数量低于整个集合 (buy_values
) 中的用户数量,您可以简单地过滤 userProdSet
以仅保留一部分用户。
1.4 具有用于生成所有推荐的 recommendAll,以便可以通过 kv 存储提供服务。
我遇到了与 spark 并发相关的问题,这使我无法在生产中使用它,但我知道有办法解决它。我正在尝试 运行 为 700 万用户使用订单历史记录 10 亿个产品来激发 ALS。首先,我列出了不同用户的列表,然后 运行 在这些用户上循环以获得推荐,这是一个非常缓慢的过程,需要几天时间才能为所有用户获得推荐。我尝试使用笛卡尔用户和产品来一次获得所有推荐,但再次将其提供给 elasticsearch 我必须为每个用户过滤和排序记录,然后我才能将其提供给 elasticsearch 以供其他 API 使用。
所以请给我一个解决方案,它在这种用例中具有很好的可扩展性,并且可以在带有实时建议的生产中使用。
这是我在 scala 中的代码片段,可以让您了解我目前是如何解决问题的:
// buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
def recommend_for_user(user: Int): Unit = {
println("Recommendations for User ID: " + user);
// Product IDs which are not bought by user
val candidates = buys_values
.filter(x => x("customer_id").toString.toInt != user)
.map(x => x("product_id").toString.toInt)
.distinct().map((user, _))
// find 30 products with top rating
val recommendations = bestModel.get
.predict(candidates)
.takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))
var i = 1
var ESMap = Map[String, String]()
recommendations.foreach { r =>
ESMap += r.product.toString -> bitem_ids.value(r.product)
}
// push to elasticsearch with user as id
client.execute {
index into "recommendation" / "items" id user fields ESMap
}.await
// remove candidate RDD from memory
candidates.unpersist()
}
// iterate on each user to get recommendations for the user [slow process]
user_ids.foreach(recommend_for_user)
很明显,您程序中的瓶颈是搜索 candidates
。考虑到 Spark 架构,它会严重限制您的并行化能力,并通过为每个用户启动 Spark 作业来增加大量开销。
假设典型场景,有 700 万用户 和 10 亿产品 大多数时候你会预测整个产品范围减去用户已经购买的少数。至少在我看来,重要的问题是为什么还要费心过滤。即使你推荐以前买过的产品真的有害吗?
除非你有非常严格的要求,否则我会简单地忽略这个问题并使用 MatrixFactorizationModel.recommendProductsForUsers
,它几乎可以为你完成所有工作,不包括数据导出。之后您可以执行批量导出,一切顺利。
现在假设您有明确的禁止重复政策。假设典型用户只购买了相对少量的产品,您可以从为每个用户获取一组产品开始:
val userProdSet = buy_values
.map{case (user, product, _) => (user, product)}
.aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)
接下来您可以简单地映射 userProdSet
以获得预测:
// Number of predictions for each user
val nPred = 30;
userProdSet.map{case (user, prodSet) => {
val recommended = model
// Find recommendations for user
.recommendProducts(_, nPred + prodSet.size))
// Filter to remove already purchased
.filter(rating => !prodSet.contains(rating.product))
// Sort and limit
.sortBy(_.rating)
.reverse
.take(nPred)
(user, recommended)
}}
您可以通过使用可变集进行聚合和广播模型来进一步改进,但这是一般的想法。
如果 user_ids
中的用户数量低于整个集合 (buy_values
) 中的用户数量,您可以简单地过滤 userProdSet
以仅保留一部分用户。
1.4 具有用于生成所有推荐的 recommendAll,以便可以通过 kv 存储提供服务。