使用 SparkML 预测模型时的任务序列化问题
Task serialization issue when using SparkML prediction model
当 运行 此代码时出现任务序列化错误,其中 myDstream
是 DStream[String]
并且 session
是 String
:
val model = GradientBoostedTreesModel.load(sc,mySet.value("modelAddress") + mySet.value("modelId"))
val newDstream = myDstream.map(session => {
val features : Array[String] = UtilsPredictor.getFeatures()
val parsedSession = UtilsPredictor.parseJSON(session)
var input: String = ""
var count: Integer = 1
for (i <- 0 until features.length) {
if (count < features.length) {
input += parsedSession(features(i)) + ","
count += 1
}
else {
input += parsedSession(features(i))
}
}
input = "[" + input + "]"
val vecTest = Vectors.parse(input)
parsedSession + ("prediction_result" -> model.predict(vecTest).toString)
})
newDstream.foreachRDD(session => {
session.foreachPartition({ partitionOfRecords =>
//...
})
})
对象 UtilsPredictor
是可序列化的。该问题涉及预测模型的使用。
但最奇怪的是,序列化错误是由newDstream.foreachRDD(session => {
行触发的。有什么想法可以避免这个错误吗?
更新:
我尝试了 @transient val vectTest = Vectors.parse(input
,但是再次出现相同的任务序列化错误。下面我提供了错误信息。特别是,错误是由行 Predictor.scala:234
触发的,即 session.foreachPartition({ partitionOfRecords =>
:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.test.classifier.Predictor$$anonfun$run.apply(Predictor.scala:234)
at org.test.classifier.Predictor$$anonfun$run.apply(Predictor.scala:233)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
- 确保你的 class 是
extends Serializable
- 将
@transient
添加到您怀疑它提供任务序列化的代码块 error.This 注释将跳过 calculating/considering 中的特定实体以进行序列化。
通常这就是我们在应用程序中编写日志记录时所做的,如下所示
@transient private lazy val log = LoggerFactory.getLogger(getClass)
当 运行 此代码时出现任务序列化错误,其中 myDstream
是 DStream[String]
并且 session
是 String
:
val model = GradientBoostedTreesModel.load(sc,mySet.value("modelAddress") + mySet.value("modelId"))
val newDstream = myDstream.map(session => {
val features : Array[String] = UtilsPredictor.getFeatures()
val parsedSession = UtilsPredictor.parseJSON(session)
var input: String = ""
var count: Integer = 1
for (i <- 0 until features.length) {
if (count < features.length) {
input += parsedSession(features(i)) + ","
count += 1
}
else {
input += parsedSession(features(i))
}
}
input = "[" + input + "]"
val vecTest = Vectors.parse(input)
parsedSession + ("prediction_result" -> model.predict(vecTest).toString)
})
newDstream.foreachRDD(session => {
session.foreachPartition({ partitionOfRecords =>
//...
})
})
对象 UtilsPredictor
是可序列化的。该问题涉及预测模型的使用。
但最奇怪的是,序列化错误是由newDstream.foreachRDD(session => {
行触发的。有什么想法可以避免这个错误吗?
更新:
我尝试了 @transient val vectTest = Vectors.parse(input
,但是再次出现相同的任务序列化错误。下面我提供了错误信息。特别是,错误是由行 Predictor.scala:234
触发的,即 session.foreachPartition({ partitionOfRecords =>
:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.test.classifier.Predictor$$anonfun$run.apply(Predictor.scala:234)
at org.test.classifier.Predictor$$anonfun$run.apply(Predictor.scala:233)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
- 确保你的 class 是
extends Serializable
- 将
@transient
添加到您怀疑它提供任务序列化的代码块 error.This 注释将跳过 calculating/considering 中的特定实体以进行序列化。
通常这就是我们在应用程序中编写日志记录时所做的,如下所示
@transient private lazy val log = LoggerFactory.getLogger(getClass)