带有尾递归的 Scala Stackoverflow 中的 Apache Spark

Apache Spark in Scala Stackoverflow with tail recursion

出现以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.WhosebugError

当我使用 Scala 执行尾递归函数时。我的印象是 Scala 中的尾递归不会溢出,这是 Scala 的优势之一。

方法如下:

  def gdAll(inputRDD : RDD[(Int, Vector, Int, Vector, Double)]) : RDD[(Int, Vector, Int, Vector, Double)] = {

val step = 0.0000055
val h4 = 0.05

val errors = inputRDD.map { case (itemid, itemVector, userid, userVector, rating) =>
  (itemid, itemVector, userid, userVector, rating, ((rating - userVector.dot(itemVector)) * itemVector) - h4 * userVector)
}.cache

val currentRMSE = sqrt(errors.aggregate(0.0)((accum, rating) => accum + pow(rating._5 - rating._4.dot(rating._2), 2), _ + _) / errors.count)

val totalUserError = errors.aggregate(Vector(0.0, 0.0))((accum, error) => accum + error._6, _+_)

val usersByKey = errors.map { case (itemid, itemVector, userid, userVector, rating, error) =>
  (userid, (userVector, itemid, itemVector, rating, error))
}

val updatedUserFactors = usersByKey.map { case ((userid, (userVector, itemid, itemVector, rating, error))) =>

  (itemid, itemVector, userid, userVector + (step * totalUserError), rating)
}

val fullyUpdatedUserFactors = updatedUserFactors.map{ case ((itemid, itemVector, userid, userVector, rating)) =>
  (itemid, itemVector, userid, userVector, rating, ((rating - userVector.dot(itemVector)) * userVector) - h4 * itemVector)}

val itemsByKey = fullyUpdatedUserFactors.map { case (itemid, itemVector, userid, userVector, rating, error) =>
  (itemid, (itemVector, userid, userVector, rating, error))
}

val totalItemError = fullyUpdatedUserFactors.aggregate(Vector(0.0, 0.0))((accum, error) => accum + error._6, _+_)

val updatedItemFactors = itemsByKey.map { case (itemid, (itemVector, userid, userVector, rating, error)) =>

  (itemid, itemVector + (step * totalItemError), userid, userVector, rating) // totalItemError = itemError
}

val newRMSE = sqrt(updatedItemFactors.aggregate(0.0)((accum, rating) => accum + pow(rating._5 - rating._4.dot(rating._2), 2), _ + _) / errors.count)

println("Original RMSE: " + currentRMSE + " New RMSE: " + newRMSE)

val changeInRMSE = (newRMSE - currentRMSE).abs

if (changeInRMSE < 0.0000005) {

  return updatedItemFactors
}

errors.unpersist()

gdAll(updatedItemFactors) // repeat if change is still large

 }

有什么想法吗?谢谢。

这是 Spark Summit East 2015 演讲的主题,Experience and Lessons Learned for Large-Scale Graph Analysis using GraphX

发生的情况是,随着每次迭代,RDD 谱系都会增长。沿袭是递归序列化的,所以在某些时候这会导致 WhosebugError.

可能的解决方法是:

  • 在此之前停止迭代。
  • 分配更大的堆栈 (-Xss)。
  • RDD.checkpoint 检查 RDD。 (该演讲详细说明了为什么这不是一个简单的修复。)
  • 只需将 RDD 写入磁盘并读回即可。