Spark 任务不可序列化
Spark task not serializable
我已经尝试了在 Whosebug 上找到的解决此问题的所有方法,但尽管如此,我还是无法解决它。
我有一个实例化 "Recommendation" 对象的 "MainObj" 对象。当我调用 "recommendationProducts" 方法时,我总是会收到错误消息。
这是该方法的代码:
def recommendationProducts(item: Int): Unit = {
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
val itemFactor = model.productFeatures.lookup(item).head
val itemVector = new DoubleMatrix(itemFactor)
//Here is where I get the error:
val sims = model.productFeatures.map { case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}
val sortedSims = sims.top(10)(Ordering.by[(Int, Double), Double] {
case (id, similarity) => similarity
})
println("\nTop 10 products:")
sortedSims.map(x => (x._1, x._2)).foreach(println)
这是错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at RecommendationObj.recommendationProducts(RecommendationObj.scala:269)
at MainObj$.analisiIUNGO(MainObj.scala:257)
at MainObj$.menu(MainObj.scala:54)
at MainObj$.main(MainObj.scala:37)
at MainObj.main(MainObj.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@7c2312fa)
- field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext)
- object (class MainObj$$anon, MainObj$$anon@615bad16)
- field (class: RecommendationObj$$anonfun, name: $outer, type: class RecommendationObj)
- object (class RecommendationObj$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 14 more
我试着补充:
1) "extends Serializable" (Scala) 到我的 Class
2) "extends extends java.io.Serializable" 到我的 Class
3)某些部分的“@transient”
4) 在这个 class 中获取模型(和其他功能)(现在我从另一个对象获取它们并将它们传递给我的 Class 就像参数一样)
我该如何解决?我快疯了!
提前致谢!
钥匙在这里:
field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext)
所以您有一个名为 sc 的 SparkContext 类型的字段。 Spark 想要序列化 class,因此他也尝试序列化所有字段。
你应该:
- 使用@transient 注释并检查是否为空,然后重新创建
- 不使用字段中的 SparkContext,而是将其放入方法的参数中。但是请记住,永远不要在 map、flatMap 等闭包中使用 SparkContext
我已经尝试了在 Whosebug 上找到的解决此问题的所有方法,但尽管如此,我还是无法解决它。 我有一个实例化 "Recommendation" 对象的 "MainObj" 对象。当我调用 "recommendationProducts" 方法时,我总是会收到错误消息。 这是该方法的代码:
def recommendationProducts(item: Int): Unit = {
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
val itemFactor = model.productFeatures.lookup(item).head
val itemVector = new DoubleMatrix(itemFactor)
//Here is where I get the error:
val sims = model.productFeatures.map { case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}
val sortedSims = sims.top(10)(Ordering.by[(Int, Double), Double] {
case (id, similarity) => similarity
})
println("\nTop 10 products:")
sortedSims.map(x => (x._1, x._2)).foreach(println)
这是错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at RecommendationObj.recommendationProducts(RecommendationObj.scala:269)
at MainObj$.analisiIUNGO(MainObj.scala:257)
at MainObj$.menu(MainObj.scala:54)
at MainObj$.main(MainObj.scala:37)
at MainObj.main(MainObj.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@7c2312fa)
- field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext)
- object (class MainObj$$anon, MainObj$$anon@615bad16)
- field (class: RecommendationObj$$anonfun, name: $outer, type: class RecommendationObj)
- object (class RecommendationObj$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 14 more
我试着补充: 1) "extends Serializable" (Scala) 到我的 Class 2) "extends extends java.io.Serializable" 到我的 Class 3)某些部分的“@transient” 4) 在这个 class 中获取模型(和其他功能)(现在我从另一个对象获取它们并将它们传递给我的 Class 就像参数一样)
我该如何解决?我快疯了! 提前致谢!
钥匙在这里:
field (class: RecommendationObj, name: sc, type: class org.apache.spark.SparkContext)
所以您有一个名为 sc 的 SparkContext 类型的字段。 Spark 想要序列化 class,因此他也尝试序列化所有字段。
你应该:
- 使用@transient 注释并检查是否为空,然后重新创建
- 不使用字段中的 SparkContext,而是将其放入方法的参数中。但是请记住,永远不要在 map、flatMap 等闭包中使用 SparkContext