NotSerializableException 与 scala hadoop 中的对象

NotSerializableException with an object in scala hadoop

object NearestNeighbors {

    def runNearestNeighbors(data: RDD[Array[(LabeledPoint,Int,Int)]], 
        kNN: Int, 
        sampleData: Array[(LabeledPoint,Int,Int)]): Array[(String,Array[((Int,Int),Double)])] = {

        val globalNearestNeighborsByIndex = data.mapPartitionsWithIndex(localNearestNeighbors(_,_,kNN,sampleData)).groupByKey().map(x => (x._1,x._2.toArray.sortBy(r => r._2).take(kNN))).collect()     

        globalNearestNeighborsByIndex 
    }


    private def localNearestNeighbors(partitionIndex: Long,
        iter: Iterator[Array[(LabeledPoint,Int,Int)]],
        kNN: Int,
        sampleData: Array[(LabeledPoint,Int,Int)]): Iterator[(String,((Int,Int),Double))] = { 

            var result = List[(String,((Int,Int),Double))]()
            val dataArr = iter.next
            val nLocal = dataArr.size - 1           
            val sampleDataSize = sampleData.size - 1


            val kLocalNeighbors = Array.fill[distanceIndex](sampleDataSize+1)(null)
            for {
                i1 <- 0 to sampleDataSize
            } 
            kLocalNeighbors(i1) = distanceIndex(sampleData(i1)._3.toInt, sampleData(i1)._2.toInt, DenseVector.zeros[Double](kNN) + Int.MaxValue.toDouble, DenseVector.zeros[Int](kNN))

            for (i <- 0 to nLocal) {
                val currentPoint = dataArr(i)
                val features = currentPoint._1.features
                val rowId = currentPoint._3.toInt   
                for (j <- 0 to sampleDataSize) {
                    val samplePartitionId = sampleData(j)._2
                    val sampleRowId = sampleData(j)._3
                    val sampleFeatures = sampleData(j)._1.features
                    if (!((rowId == sampleRowId) & (samplePartitionId == partitionIndex))) {
                        val distance = Math.sqrt(sum((sampleFeatures - features) :* (sampleFeatures - features)))
                        if (distance < max(kLocalNeighbors(j).distanceVector)) {
                            val indexToReplace = argmax(kLocalNeighbors(j).distanceVector)
                            kLocalNeighbors(j).distanceVector(indexToReplace) = distance
                            kLocalNeighbors(j).neighborRowId(indexToReplace) = rowId
                        }
                    }
                }
            }
            for (m <- 0 to sampleDataSize){
                for (l <-0 to kNN-1) {

                    val key = kLocalNeighbors(m).partitionId.toString+","+kLocalNeighbors(m).sampleRowId.toString
                    val tup = (partitionIndex.toInt,kLocalNeighbors(m).neighborRowId(l))
                    result.::=(key,(tup,kLocalNeighbors(m).distanceVector(l)))
                }
            }           
        result.iterator 
    }   
}

我正在使用 https://github.com/anathan90/SparkSMOTE(scala 库)来调整数据中少数 class 的过采样

我有序列化问题,我不知道为什么。

我读到了有关此错误的一些信息,但我不明白

另一件事是我是 运行 scala hadoop 中的 smote 脚本,它在其他名为 smote 的脚本中调用此对象。

这是错误:

Caused by: java.io.NotSerializableException: NearestNeighbors$
Serialization stack:
        - object not serializable (class: NearestNeighbors$, value: NearestNeighbors$@77542834)
        - field (class: NearestNeighbors$$anonfun, name: $outer, type: class NearestNeighbors$)
        - object (class NearestNeighbors$$anonfun, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)

如果你看Serializable 您会看到它是一个 接口 ,因此我们可以检查 NearestNeighbors 是否满足它。

NearestNeighbors不满足