Apache Spark - org.apache.spark.SparkException:任务不可序列化
Apache Spark - org.apache.spark.SparkException: Task not serializable
尝试运行我的方法时:
def doGD() = {
allRatings.foreach(rating => gradientDescent(rating));
}
我收到错误:org.apache.spark.SparkException: Task not serialisable
我知道我的梯度下降方法不会并行化,因为每一步都依赖于前一步 - 所以并行工作不是一种选择。但是,如果我这样做,从控制台:
val gd = new GradientDescent()
gd.doGD();
我收到上述错误。
但是,如果我在控制台中这样做:
val gd = new GradientDescent()
gd.allRatings.foreach(rating => gradientDescent(rating))
它工作得很好。您可能已经注意到,第二个示例中的工作代码与第一个示例中的代码相同,只是我只是将代码从方法中取出并直接调用它,而不是方法。
为什么一个有效而另一个无效?我很困惑。
(附加说明:Class GradientDescent extends Serializable
)。
gradientDescent
方法:
def gradientDescent(rating : Rating) = {
var userVector = userFactors.get(rating.user).get
var itemVector = itemFactors.get(rating.product).get
userFactors.map(x => if(x._1 == rating.user)(x._1, x._2 += 0.02 * (calculatePredictionError(rating.rating, userVector, itemVector) * itemVector)))
userVector = userFactors.get(rating.user).get // updated user vector
itemFactors.map(x => if(x._1 == rating.product)(x._1, x._2 += 0.02 * (calculatePredictionError(rating.rating, userVector, itemVector) * itemVector)))
}
我知道我正在使用存储在主服务器上的 2 个变量 - userFactors
和 itemFactors
- 由于该过程是顺序并行化是不可能的。但这并不能解释为什么从控制台调用方法不起作用,但在控制台中重写方法的内部却可以。
如果没有 GradientDescent
class 的完整来源很难判断,但您可能捕获了一个不可序列化的值。 运行方法时,需要序列化完整的对象并发送给worker,而内联版本则不需要。
尝试运行我的方法时:
def doGD() = {
allRatings.foreach(rating => gradientDescent(rating));
}
我收到错误:org.apache.spark.SparkException: Task not serialisable
我知道我的梯度下降方法不会并行化,因为每一步都依赖于前一步 - 所以并行工作不是一种选择。但是,如果我这样做,从控制台:
val gd = new GradientDescent()
gd.doGD();
我收到上述错误。
但是,如果我在控制台中这样做:
val gd = new GradientDescent()
gd.allRatings.foreach(rating => gradientDescent(rating))
它工作得很好。您可能已经注意到,第二个示例中的工作代码与第一个示例中的代码相同,只是我只是将代码从方法中取出并直接调用它,而不是方法。
为什么一个有效而另一个无效?我很困惑。
(附加说明:Class GradientDescent extends Serializable
)。
gradientDescent
方法:
def gradientDescent(rating : Rating) = {
var userVector = userFactors.get(rating.user).get
var itemVector = itemFactors.get(rating.product).get
userFactors.map(x => if(x._1 == rating.user)(x._1, x._2 += 0.02 * (calculatePredictionError(rating.rating, userVector, itemVector) * itemVector)))
userVector = userFactors.get(rating.user).get // updated user vector
itemFactors.map(x => if(x._1 == rating.product)(x._1, x._2 += 0.02 * (calculatePredictionError(rating.rating, userVector, itemVector) * itemVector)))
}
我知道我正在使用存储在主服务器上的 2 个变量 - userFactors
和 itemFactors
- 由于该过程是顺序并行化是不可能的。但这并不能解释为什么从控制台调用方法不起作用,但在控制台中重写方法的内部却可以。
如果没有 GradientDescent
class 的完整来源很难判断,但您可能捕获了一个不可序列化的值。 运行方法时,需要序列化完整的对象并发送给worker,而内联版本则不需要。