为什么自定义DefaultSource给java.io.NotSerializableException?
Why does custom DefaultSource give java.io.NotSerializableException?
这是我第一次 post 关于 SO,如果使用了不正确的格式,我深表歉意。
我正在与 Apache Spark 合作创建一个新的源(通过 DefaultSource)、BaseRelations 等...并且 运行 解决了一个我想更好地理解的序列化问题。考虑下面的 class,它扩展了 BaseRelation 并实现了扫描生成器。
class RootTableScan(path: String, treeName: String)(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan{
private val att: core.SRType =
{
val reader = new RootFileReader(new java.io.File(Seq(path) head))
val tmp =
if (treeName==null)
buildATT(findTree(reader.getTopDir), arrangeStreamers(reader), null)
else
buildATT(reader.getKey(treeName).getObject.asInstanceOf[TTree],
arrangeStreamers(reader), null)
tmp
}
// define the schema from the AST
def schema: StructType = {
val s = buildSparkSchema(att)
s
}
// builds a scan
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// parallelize over all the files
val r = sqlContext.sparkContext.parallelize(Seq(path), 1).
flatMap({fileName =>
val reader = new RootFileReader(new java.io.File(fileName))
// get the TTree
/* PROBLEM !!! */
val rootTree =
// findTree(reader)
if (treeName == null) findTree(reader)
else reader.getKey(treeName).getObject.asInstanceOf[TTree]
new RootTreeIterator(rootTree, arrangeStreamers(reader),
requiredColumns, filters)
})
println("Done building Scan")
r
}
}
}
PROBLEM 标识问题发生的位置。 treeName 是一个通过构造函数注入到 class 中的 val。使用它的 lambda 应该在从站上执行,我确实需要发送 treeName - 序列化它。我想了解为什么下面的代码片段会导致此 NotSerializableException。我确信没有 treeName ,它工作得很好
val rootTree =
// findTree(reader)
if (treeName == null) findTree(reader)
else reader.getKey(treeName).getObject.asInstanceOf[TTree]
下面是堆栈跟踪
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
at org.apache.spark.rdd.RDD$$anonfun$flatMap.apply(RDD.scala:375)
at org.apache.spark.rdd.RDD$$anonfun$flatMap.apply(RDD.scala:374)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.flatMap(RDD.scala:374)
at org.dianahep.sparkroot.package$RootTableScan.buildScan(sparkroot.scala:95)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun.apply(DataSourceStrategy.scala:260)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun.apply(DataSourceStrategy.scala:260)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject.apply(DataSourceStrategy.scala:303)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject.apply(DataSourceStrategy.scala:302)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:379)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:298)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:256)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:60)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:60)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
at org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$$anonfun$apply.applyOrElse(SparkPlanner.scala:51)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$$anonfun$apply.applyOrElse(SparkPlanner.scala:48)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:301)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:301)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan.apply(SparkPlanner.scala:48)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan.apply(SparkPlanner.scala:48)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
... 50 elided
Caused by: java.io.NotSerializableException: org.dianahep.sparkroot.package$RootTableScan
Serialization stack:
- object not serializable (class: org.dianahep.sparkroot.package$RootTableScan, value: org.dianahep.sparkroot.package$RootTableScan@6421e9e7)
- field (class: org.dianahep.sparkroot.package$RootTableScan$$anonfun, name: $outer, type: class org.dianahep.sparkroot.package$RootTableScan)
- object (class org.dianahep.sparkroot.package$RootTableScan$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
从堆栈中我想我可以推断出它试图序列化我的 lambda 但不能。这个 lambda 应该是一个闭包,因为我们有一个在 lambda 范围之外定义的 val。但是我不明白为什么这个不能连载
任何帮助将不胜感激!!!
非常感谢!
任何时候 scala 闭包引用 class 变量,如 treeName
,然后 JVM 序列化父 class 和闭包。但是,您的 class RootTableScan
不可序列化!解决方法是创建一个本地字符串变量:
// builds a scan
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val localTreeName = treeName // this is safe to serialize
// parallelize over all the files
val r = sqlContext.sparkContext.parallelize(Seq(path), 1).
flatMap({fileName =>
val reader = new RootFileReader(new java.io.File(fileName))
// get the TTree
/* PROBLEM !!! */
val rootTree =
// findTree(reader)
if (localTreeName == null) findTree(reader)
else reader.getKey(localTreeName).getObject.asInstanceOf[TTree]
new RootTreeIterator(rootTree, arrangeStreamers(reader),
requiredColumns, filters)
})
这是我第一次 post 关于 SO,如果使用了不正确的格式,我深表歉意。
我正在与 Apache Spark 合作创建一个新的源(通过 DefaultSource)、BaseRelations 等...并且 运行 解决了一个我想更好地理解的序列化问题。考虑下面的 class,它扩展了 BaseRelation 并实现了扫描生成器。
class RootTableScan(path: String, treeName: String)(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan{
private val att: core.SRType =
{
val reader = new RootFileReader(new java.io.File(Seq(path) head))
val tmp =
if (treeName==null)
buildATT(findTree(reader.getTopDir), arrangeStreamers(reader), null)
else
buildATT(reader.getKey(treeName).getObject.asInstanceOf[TTree],
arrangeStreamers(reader), null)
tmp
}
// define the schema from the AST
def schema: StructType = {
val s = buildSparkSchema(att)
s
}
// builds a scan
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// parallelize over all the files
val r = sqlContext.sparkContext.parallelize(Seq(path), 1).
flatMap({fileName =>
val reader = new RootFileReader(new java.io.File(fileName))
// get the TTree
/* PROBLEM !!! */
val rootTree =
// findTree(reader)
if (treeName == null) findTree(reader)
else reader.getKey(treeName).getObject.asInstanceOf[TTree]
new RootTreeIterator(rootTree, arrangeStreamers(reader),
requiredColumns, filters)
})
println("Done building Scan")
r
}
}
}
PROBLEM 标识问题发生的位置。 treeName 是一个通过构造函数注入到 class 中的 val。使用它的 lambda 应该在从站上执行,我确实需要发送 treeName - 序列化它。我想了解为什么下面的代码片段会导致此 NotSerializableException。我确信没有 treeName ,它工作得很好
val rootTree =
// findTree(reader)
if (treeName == null) findTree(reader)
else reader.getKey(treeName).getObject.asInstanceOf[TTree]
下面是堆栈跟踪
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
at org.apache.spark.rdd.RDD$$anonfun$flatMap.apply(RDD.scala:375)
at org.apache.spark.rdd.RDD$$anonfun$flatMap.apply(RDD.scala:374)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.flatMap(RDD.scala:374)
at org.dianahep.sparkroot.package$RootTableScan.buildScan(sparkroot.scala:95)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun.apply(DataSourceStrategy.scala:260)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun.apply(DataSourceStrategy.scala:260)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject.apply(DataSourceStrategy.scala:303)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject.apply(DataSourceStrategy.scala:302)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:379)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:298)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:256)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:60)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:60)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
at org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$$anonfun$apply.applyOrElse(SparkPlanner.scala:51)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$$anonfun$apply.applyOrElse(SparkPlanner.scala:48)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:301)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:301)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan.apply(SparkPlanner.scala:48)
at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan.apply(SparkPlanner.scala:48)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
... 50 elided
Caused by: java.io.NotSerializableException: org.dianahep.sparkroot.package$RootTableScan
Serialization stack:
- object not serializable (class: org.dianahep.sparkroot.package$RootTableScan, value: org.dianahep.sparkroot.package$RootTableScan@6421e9e7)
- field (class: org.dianahep.sparkroot.package$RootTableScan$$anonfun, name: $outer, type: class org.dianahep.sparkroot.package$RootTableScan)
- object (class org.dianahep.sparkroot.package$RootTableScan$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
从堆栈中我想我可以推断出它试图序列化我的 lambda 但不能。这个 lambda 应该是一个闭包,因为我们有一个在 lambda 范围之外定义的 val。但是我不明白为什么这个不能连载
任何帮助将不胜感激!!! 非常感谢!
任何时候 scala 闭包引用 class 变量,如 treeName
,然后 JVM 序列化父 class 和闭包。但是,您的 class RootTableScan
不可序列化!解决方法是创建一个本地字符串变量:
// builds a scan
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val localTreeName = treeName // this is safe to serialize
// parallelize over all the files
val r = sqlContext.sparkContext.parallelize(Seq(path), 1).
flatMap({fileName =>
val reader = new RootFileReader(new java.io.File(fileName))
// get the TTree
/* PROBLEM !!! */
val rootTree =
// findTree(reader)
if (localTreeName == null) findTree(reader)
else reader.getKey(localTreeName).getObject.asInstanceOf[TTree]
new RootTreeIterator(rootTree, arrangeStreamers(reader),
requiredColumns, filters)
})