Spark:对象不可序列化
Spark: object not serializable
我有一个批处理作业,我正在尝试将其转换为结构化流。我收到以下错误:
20/03/31 15:09:23 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: com.apple.ireporter.analytics.compute.AggregateKey
Serialization stack:
- object not serializable (class: com.apple.ireporter.analytics.compute.AggregateKey, value: d_)
... 其中 "d_" 是数据集中的最后一行
这是相关的代码片段
df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
import spark.implicits._
val javaRdd = batchDF.toJavaRDD
val dataframeToRowColFunction = new RowToColumn(table)
println("Back to Main class")
val combinedRdd =javaRdd.flatMapToPair(dataframeToRowColFunction.FlatMapData2).combineByKey(aggrCreateComb.createCombiner,aggrMerge.aggrMerge,aggrMergeCombiner.aggrMergeCombiner)
// spark.createDataFrame( combinedRdd).show(1); // I commented this
// combinedRdd.collect() // I added this as a test
}
这是 FlatMapData2 class
val FlatMapData2: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
//val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
override def call(x: Row) = {
val tuples = new util.ArrayList[Tuple2[AggregateKey, AggregateValue]]
val decomposedEvents = decomposer.decomposeDistributed(x)
decomposedEvents.foreach {
y => tuples.add(Tuple2(y._1,y._2))
}
tuples.iterator()
}
}
这里是聚合Keyclass
class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends Comparable [AggregateKey]{
...
}
我是新手,如有任何帮助,我们将不胜感激。如果需要添加任何其他内容,请告诉我
我可以通过扩展 AggregateKey 来解决这个问题 java.io.Serializable
class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends java.io.Serializable{
我有一个批处理作业,我正在尝试将其转换为结构化流。我收到以下错误:
20/03/31 15:09:23 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: com.apple.ireporter.analytics.compute.AggregateKey
Serialization stack:
- object not serializable (class: com.apple.ireporter.analytics.compute.AggregateKey, value: d_)
... 其中 "d_" 是数据集中的最后一行
这是相关的代码片段
df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
import spark.implicits._
val javaRdd = batchDF.toJavaRDD
val dataframeToRowColFunction = new RowToColumn(table)
println("Back to Main class")
val combinedRdd =javaRdd.flatMapToPair(dataframeToRowColFunction.FlatMapData2).combineByKey(aggrCreateComb.createCombiner,aggrMerge.aggrMerge,aggrMergeCombiner.aggrMergeCombiner)
// spark.createDataFrame( combinedRdd).show(1); // I commented this
// combinedRdd.collect() // I added this as a test
}
这是 FlatMapData2 class
val FlatMapData2: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
//val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
override def call(x: Row) = {
val tuples = new util.ArrayList[Tuple2[AggregateKey, AggregateValue]]
val decomposedEvents = decomposer.decomposeDistributed(x)
decomposedEvents.foreach {
y => tuples.add(Tuple2(y._1,y._2))
}
tuples.iterator()
}
}
这里是聚合Keyclass
class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends Comparable [AggregateKey]{
...
}
我是新手,如有任何帮助,我们将不胜感激。如果需要添加任何其他内容,请告诉我
我可以通过扩展 AggregateKey 来解决这个问题 java.io.Serializable
class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends java.io.Serializable{