SPARK 1.6.1:在 DataFrame 上评估分类器时任务不可序列化
SPARK 1.6.1: Task not serializable when evaluating a classifier on a DataFrame
我有一个DataFrame,我把它映射成一个()的RDD来测试一个SVMModel。
我正在使用 Zeppelin 和 Spark 1.6.1
这是我的代码:
val loadedSVMModel = SVMModel.load(sc, pathToSvmModel)
// Clear the default threshold.
loadedSVMModel.clearThreshold()
// Compute raw scores on the test set.
val scoreAndLabels = df.select($"features", $"label")
.map { case Row(features:Vector, label: Double) =>
val score = loadedSVMModel.predict(features)
(score,label)
}
// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
执行代码时,我有一个 org.apache.spark.SparkException: Task not serializable;
,我很难理解为什么会发生这种情况以及如何解决它。
- 是不是我用的Zeppelin导致的?
- 是不是因为原来的DataFrame?
我已经执行了Spark编程指南中的SVM example,并且完美运行。所以原因应该与以上几点之一有关……我猜。
这里是异常堆栈的一些相关元素:
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: (sum(CASE WHEN (domainIndex = 0) THEN sumOfScores ELSE 0),mode=Complete,isDistinct=false) AS 0#100278)
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Column;, size 372)
我没有 post 完整的异常堆栈,因为 Zeppelin 往往会显示很长的不相关文本。如果您希望我通过完整的异常,请告诉我。
附加信息
特征向量是使用 VectorAssembler() 生成的,如下所示
// Prepare vector assemble
val vecAssembler = new VectorAssembler()
.setInputCols(arrayOfIndices)
.setOutputCol("features")
// Aggregation expressions
val exprs = arrayOfIndices
.map(c => sum(when($"domainIndex" === c, $"sumOfScores")
.otherwise(lit(0))).alias(c))
val df = vecAssembler
.transform(anotherDF.groupBy($"userID", $"val")
.agg(exprs.head, exprs.tail: _*))
.select($"userID", $"features", $"val")
.withColumn("label", sqlCreateLabelValue($"val"))
.drop($"val").drop($"userID")
问题的根源实际上与您使用的 DataFrame
无关,甚至直接与 Zeppelin 无关。这更多的是代码组织问题以及同一范围内不可序列化对象的存在。
由于您使用交互式会话,所有对象都在同一范围内定义并成为闭包的一部分。它包括 exprs
,看起来像 Seq[Column]
,其中 Column
不可序列化。
在 SQL 表达式上操作时没有问题,因为 exprs
仅在本地使用,但当您下拉到 RDD
操作时就会出现问题。 exprs
作为闭包的一部分包含在内并导致表达式。重现此行为的最简单方法(ColumnName
是 Column
的子类之一)是这样的:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.0-SNAPSHOT
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val df = Seq(1, 2, 3).toDF("x")
df: org.apache.spark.sql.DataFrame = [x: int]
scala> val x = $"x"
x: org.apache.spark.sql.ColumnName = x
scala> def f(x: Any) = 0
f: (x: Any)Int
scala> df.select(x).rdd.map(f _)
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.ColumnName
Serialization stack:
- object not serializable (class: org.apache.spark.sql.ColumnName, value: x)
...
您可以尝试解决此问题的一种方法是将 exprs
标记为瞬态:
@transient val exprs: Seq[Column] = ???
在我们的最小示例中也能正常工作:
scala> @transient val x = $"x"
x: org.apache.spark.sql.ColumnName = x
scala> df.select(x).rdd.map(f _)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:30
我有一个DataFrame,我把它映射成一个()的RDD来测试一个SVMModel。
我正在使用 Zeppelin 和 Spark 1.6.1
这是我的代码:
val loadedSVMModel = SVMModel.load(sc, pathToSvmModel)
// Clear the default threshold.
loadedSVMModel.clearThreshold()
// Compute raw scores on the test set.
val scoreAndLabels = df.select($"features", $"label")
.map { case Row(features:Vector, label: Double) =>
val score = loadedSVMModel.predict(features)
(score,label)
}
// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
执行代码时,我有一个 org.apache.spark.SparkException: Task not serializable;
,我很难理解为什么会发生这种情况以及如何解决它。
- 是不是我用的Zeppelin导致的?
- 是不是因为原来的DataFrame?
我已经执行了Spark编程指南中的SVM example,并且完美运行。所以原因应该与以上几点之一有关……我猜。
这里是异常堆栈的一些相关元素:
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: (sum(CASE WHEN (domainIndex = 0) THEN sumOfScores ELSE 0),mode=Complete,isDistinct=false) AS 0#100278)
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Column;, size 372)
我没有 post 完整的异常堆栈,因为 Zeppelin 往往会显示很长的不相关文本。如果您希望我通过完整的异常,请告诉我。
附加信息
特征向量是使用 VectorAssembler() 生成的,如下所示
// Prepare vector assemble
val vecAssembler = new VectorAssembler()
.setInputCols(arrayOfIndices)
.setOutputCol("features")
// Aggregation expressions
val exprs = arrayOfIndices
.map(c => sum(when($"domainIndex" === c, $"sumOfScores")
.otherwise(lit(0))).alias(c))
val df = vecAssembler
.transform(anotherDF.groupBy($"userID", $"val")
.agg(exprs.head, exprs.tail: _*))
.select($"userID", $"features", $"val")
.withColumn("label", sqlCreateLabelValue($"val"))
.drop($"val").drop($"userID")
问题的根源实际上与您使用的 DataFrame
无关,甚至直接与 Zeppelin 无关。这更多的是代码组织问题以及同一范围内不可序列化对象的存在。
由于您使用交互式会话,所有对象都在同一范围内定义并成为闭包的一部分。它包括 exprs
,看起来像 Seq[Column]
,其中 Column
不可序列化。
在 SQL 表达式上操作时没有问题,因为 exprs
仅在本地使用,但当您下拉到 RDD
操作时就会出现问题。 exprs
作为闭包的一部分包含在内并导致表达式。重现此行为的最简单方法(ColumnName
是 Column
的子类之一)是这样的:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.0-SNAPSHOT
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val df = Seq(1, 2, 3).toDF("x")
df: org.apache.spark.sql.DataFrame = [x: int]
scala> val x = $"x"
x: org.apache.spark.sql.ColumnName = x
scala> def f(x: Any) = 0
f: (x: Any)Int
scala> df.select(x).rdd.map(f _)
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.ColumnName
Serialization stack:
- object not serializable (class: org.apache.spark.sql.ColumnName, value: x)
...
您可以尝试解决此问题的一种方法是将 exprs
标记为瞬态:
@transient val exprs: Seq[Column] = ???
在我们的最小示例中也能正常工作:
scala> @transient val x = $"x"
x: org.apache.spark.sql.ColumnName = x
scala> df.select(x).rdd.map(f _)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:30