使用 SparkML 预测模型时的任务序列化问题

Task serialization issue when using SparkML prediction model

当 运行 此代码时出现任务序列化错误,其中 myDstreamDStream[String] 并且 sessionString:

      val model = GradientBoostedTreesModel.load(sc,mySet.value("modelAddress") + mySet.value("modelId"))
      val newDstream = myDstream.map(session => {
        val features : Array[String] = UtilsPredictor.getFeatures()
        val parsedSession = UtilsPredictor.parseJSON(session)
        var input: String = ""
        var count: Integer = 1
        for (i <- 0 until features.length) {
          if (count < features.length) {
            input += parsedSession(features(i)) + ","
            count += 1
          }
          else {
            input += parsedSession(features(i))
          }
        }
        input = "[" + input + "]"
        val vecTest = Vectors.parse(input)
        parsedSession + ("prediction_result" -> model.predict(vecTest).toString)
      })


      newDstream.foreachRDD(session => {
        session.foreachPartition({ partitionOfRecords =>
            //...
        })
      })

对象 UtilsPredictor 是可序列化的。该问题涉及预测模型的使用。 但最奇怪的是,序列化错误是由newDstream.foreachRDD(session => {行触发的。有什么想法可以避免这个错误吗?

更新:

我尝试了 @transient val vectTest = Vectors.parse(input,但是再次出现相同的任务序列化错误。下面我提供了错误信息。特别是,错误是由行 Predictor.scala:234 触发的,即 session.foreachPartition({ partitionOfRecords =>:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.classifier.Predictor$$anonfun$run.apply(Predictor.scala:234)
    at org.test.classifier.Predictor$$anonfun$run.apply(Predictor.scala:233)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:661)
  • 确保你的 class 是 extends Serializable
  • @transient 添加到您怀疑它提供任务序列化的代码块 error.This 注释将跳过 calculating/considering 中的特定实体以进行序列化。

通常这就是我们在应用程序中编写日志记录时所做的,如下所示

 @transient private lazy val log = LoggerFactory.getLogger(getClass)