带有尾递归的 Scala Stackoverflow 中的 Apache Spark
Apache Spark in Scala Stackoverflow with tail recursion
出现以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.WhosebugError
当我使用 Scala 执行尾递归函数时。我的印象是 Scala 中的尾递归不会溢出,这是 Scala 的优势之一。
方法如下:
def gdAll(inputRDD : RDD[(Int, Vector, Int, Vector, Double)]) : RDD[(Int, Vector, Int, Vector, Double)] = {
val step = 0.0000055
val h4 = 0.05
val errors = inputRDD.map { case (itemid, itemVector, userid, userVector, rating) =>
(itemid, itemVector, userid, userVector, rating, ((rating - userVector.dot(itemVector)) * itemVector) - h4 * userVector)
}.cache
val currentRMSE = sqrt(errors.aggregate(0.0)((accum, rating) => accum + pow(rating._5 - rating._4.dot(rating._2), 2), _ + _) / errors.count)
val totalUserError = errors.aggregate(Vector(0.0, 0.0))((accum, error) => accum + error._6, _+_)
val usersByKey = errors.map { case (itemid, itemVector, userid, userVector, rating, error) =>
(userid, (userVector, itemid, itemVector, rating, error))
}
val updatedUserFactors = usersByKey.map { case ((userid, (userVector, itemid, itemVector, rating, error))) =>
(itemid, itemVector, userid, userVector + (step * totalUserError), rating)
}
val fullyUpdatedUserFactors = updatedUserFactors.map{ case ((itemid, itemVector, userid, userVector, rating)) =>
(itemid, itemVector, userid, userVector, rating, ((rating - userVector.dot(itemVector)) * userVector) - h4 * itemVector)}
val itemsByKey = fullyUpdatedUserFactors.map { case (itemid, itemVector, userid, userVector, rating, error) =>
(itemid, (itemVector, userid, userVector, rating, error))
}
val totalItemError = fullyUpdatedUserFactors.aggregate(Vector(0.0, 0.0))((accum, error) => accum + error._6, _+_)
val updatedItemFactors = itemsByKey.map { case (itemid, (itemVector, userid, userVector, rating, error)) =>
(itemid, itemVector + (step * totalItemError), userid, userVector, rating) // totalItemError = itemError
}
val newRMSE = sqrt(updatedItemFactors.aggregate(0.0)((accum, rating) => accum + pow(rating._5 - rating._4.dot(rating._2), 2), _ + _) / errors.count)
println("Original RMSE: " + currentRMSE + " New RMSE: " + newRMSE)
val changeInRMSE = (newRMSE - currentRMSE).abs
if (changeInRMSE < 0.0000005) {
return updatedItemFactors
}
errors.unpersist()
gdAll(updatedItemFactors) // repeat if change is still large
}
有什么想法吗?谢谢。
这是 Spark Summit East 2015 演讲的主题,Experience and Lessons Learned for Large-Scale Graph Analysis using GraphX。
发生的情况是,随着每次迭代,RDD 谱系都会增长。沿袭是递归序列化的,所以在某些时候这会导致 WhosebugError
.
可能的解决方法是:
- 在此之前停止迭代。
- 分配更大的堆栈 (
-Xss
)。
- 用
RDD.checkpoint
检查 RDD。 (该演讲详细说明了为什么这不是一个简单的修复。)
- 只需将 RDD 写入磁盘并读回即可。
出现以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.WhosebugError
当我使用 Scala 执行尾递归函数时。我的印象是 Scala 中的尾递归不会溢出,这是 Scala 的优势之一。
方法如下:
def gdAll(inputRDD : RDD[(Int, Vector, Int, Vector, Double)]) : RDD[(Int, Vector, Int, Vector, Double)] = {
val step = 0.0000055
val h4 = 0.05
val errors = inputRDD.map { case (itemid, itemVector, userid, userVector, rating) =>
(itemid, itemVector, userid, userVector, rating, ((rating - userVector.dot(itemVector)) * itemVector) - h4 * userVector)
}.cache
val currentRMSE = sqrt(errors.aggregate(0.0)((accum, rating) => accum + pow(rating._5 - rating._4.dot(rating._2), 2), _ + _) / errors.count)
val totalUserError = errors.aggregate(Vector(0.0, 0.0))((accum, error) => accum + error._6, _+_)
val usersByKey = errors.map { case (itemid, itemVector, userid, userVector, rating, error) =>
(userid, (userVector, itemid, itemVector, rating, error))
}
val updatedUserFactors = usersByKey.map { case ((userid, (userVector, itemid, itemVector, rating, error))) =>
(itemid, itemVector, userid, userVector + (step * totalUserError), rating)
}
val fullyUpdatedUserFactors = updatedUserFactors.map{ case ((itemid, itemVector, userid, userVector, rating)) =>
(itemid, itemVector, userid, userVector, rating, ((rating - userVector.dot(itemVector)) * userVector) - h4 * itemVector)}
val itemsByKey = fullyUpdatedUserFactors.map { case (itemid, itemVector, userid, userVector, rating, error) =>
(itemid, (itemVector, userid, userVector, rating, error))
}
val totalItemError = fullyUpdatedUserFactors.aggregate(Vector(0.0, 0.0))((accum, error) => accum + error._6, _+_)
val updatedItemFactors = itemsByKey.map { case (itemid, (itemVector, userid, userVector, rating, error)) =>
(itemid, itemVector + (step * totalItemError), userid, userVector, rating) // totalItemError = itemError
}
val newRMSE = sqrt(updatedItemFactors.aggregate(0.0)((accum, rating) => accum + pow(rating._5 - rating._4.dot(rating._2), 2), _ + _) / errors.count)
println("Original RMSE: " + currentRMSE + " New RMSE: " + newRMSE)
val changeInRMSE = (newRMSE - currentRMSE).abs
if (changeInRMSE < 0.0000005) {
return updatedItemFactors
}
errors.unpersist()
gdAll(updatedItemFactors) // repeat if change is still large
}
有什么想法吗?谢谢。
这是 Spark Summit East 2015 演讲的主题,Experience and Lessons Learned for Large-Scale Graph Analysis using GraphX。
发生的情况是,随着每次迭代,RDD 谱系都会增长。沿袭是递归序列化的,所以在某些时候这会导致 WhosebugError
.
可能的解决方法是:
- 在此之前停止迭代。
- 分配更大的堆栈 (
-Xss
)。 - 用
RDD.checkpoint
检查 RDD。 (该演讲详细说明了为什么这不是一个简单的修复。) - 只需将 RDD 写入磁盘并读回即可。