"NotSerializableException " 在 scala 映射函数中
"NotSerializableException " in scala map function
我正在读取文件并尝试使用函数映射值。但它给出了一个错误 NotSerializableException
下面是我运行ning:
的代码
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min
/** Find the minimum temperature by weather station */
object MinTemperatures {
def parseLine(line: String) = {
val fields = line.split(",")
val stationID = fields(0)
val entryType = fields(2)
val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
(stationID, entryType, temperature)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MinTemperatures")
// Read each line of input data
val lines = sc.textFile("../DataSet/1800.csv")
// Convert to (stationID, entryType, temperature) tuples
val parsedLines = lines.map(parseLine)
}
}
当我运行上面的代码时,它给我以下错误:
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties Exception in thread "main"
org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at
org.apache.spark.rdd.RDD.$anonfun$map(RDD.scala:371) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at
org.apache.spark.rdd.RDD.map(RDD.scala:370) at
com.sundogsoftware.spark.MinTemperatures$.main(MinTemperatures.scala:32)
at
com.sundogsoftware.spark.MinTemperatures.main(MinTemperatures.scala)
Caused by: java.io.NotSerializableException:
com.sundogsoftware.spark.MinTemperatures$ Serialization stack:
- object not serializable (class: com.sundogsoftware.spark.MinTemperatures$, value:
com.sundogsoftware.spark.MinTemperatures$@41fed14f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class
com.sundogsoftware.spark.MinTemperatures$,
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
com/sundogsoftware/spark/MinTemperatures$.$anonfun$main:(Lcom/sundogsoftware/spark/MinTemperatures$;Ljava/lang/String;)Lscala/Tuple3;,
instantiatedMethodType=(Ljava/lang/String;)Lscala/Tuple3;,
numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
但是当我 运行 使用匿名函数的相同代码时 运行ning 成功:
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min
/** Find the minimum temperature by weather station */
object MinTemperatures {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MinTemperatures")
// Read each line of input data
val lines = sc.textFile("../DataSet/1800.csv")
// Convert to (stationID, entryType, temperature) tuples
val parsedLines = lines.map(x => {
val fields = x.split(",");
val stationID = fields(0);
val entryType = fields(2);
val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f;
(stationID, entryType, temperature)
})
// Filter out all but TMIN entries
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
// Convert to (stationID, temperature)
val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
// Reduce by stationID retaining the minimum temperature found
val minTempsByStation = stationTemps.reduceByKey((x, y) => min(x, y))
// Collect, format, and print the results
val results = minTempsByStation.collect()
for (result <- results.sorted) {
val station = result._1
val temp = result._2
val formattedTemp = f"$temp%.2f F"
println(s"$station minimum temperature: $formattedTemp")
}
}
}
输出:
EZE00100082 minimum temperature: 7.70 F
ITE00100554 minimum temperature: 5.36 F
正如您在上面看到的,当我在地图中使用 命名函数 (parseLine) 时,它给出了错误 ,但是当我使用 [=27 时是同一个程序而不是命名函数=]映射中的匿名函数运行成功.
我查看了一些博客,但没有找到错误的原因。
谁能帮助我理解这一点?
这个问题似乎与 sbt 或依赖关系无关,正如我检查的那样,当脚本未定义为对象时会发生这种情况(Scala 对象默认是可序列化的)所以这个错误意味着脚本不是可序列化。
我创建了一个新对象并粘贴了相同的代码。成功了。
我正在读取文件并尝试使用函数映射值。但它给出了一个错误 NotSerializableException 下面是我运行ning:
的代码package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min
/** Find the minimum temperature by weather station */
object MinTemperatures {
def parseLine(line: String) = {
val fields = line.split(",")
val stationID = fields(0)
val entryType = fields(2)
val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
(stationID, entryType, temperature)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MinTemperatures")
// Read each line of input data
val lines = sc.textFile("../DataSet/1800.csv")
// Convert to (stationID, entryType, temperature) tuples
val parsedLines = lines.map(parseLine)
}
}
当我运行上面的代码时,它给我以下错误:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD.$anonfun$map(RDD.scala:371) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.map(RDD.scala:370) at com.sundogsoftware.spark.MinTemperatures$.main(MinTemperatures.scala:32) at com.sundogsoftware.spark.MinTemperatures.main(MinTemperatures.scala)
Caused by: java.io.NotSerializableException:
com.sundogsoftware.spark.MinTemperatures$ Serialization stack: - object not serializable (class: com.sundogsoftware.spark.MinTemperatures$, value: com.sundogsoftware.spark.MinTemperatures$@41fed14f) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.sundogsoftware.spark.MinTemperatures$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/sundogsoftware/spark/MinTemperatures$.$anonfun$main:(Lcom/sundogsoftware/spark/MinTemperatures$;Ljava/lang/String;)Lscala/Tuple3;, instantiatedMethodType=(Ljava/lang/String;)Lscala/Tuple3;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda)
但是当我 运行 使用匿名函数的相同代码时 运行ning 成功:
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min
/** Find the minimum temperature by weather station */
object MinTemperatures {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "MinTemperatures")
// Read each line of input data
val lines = sc.textFile("../DataSet/1800.csv")
// Convert to (stationID, entryType, temperature) tuples
val parsedLines = lines.map(x => {
val fields = x.split(",");
val stationID = fields(0);
val entryType = fields(2);
val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f;
(stationID, entryType, temperature)
})
// Filter out all but TMIN entries
val minTemps = parsedLines.filter(x => x._2 == "TMIN")
// Convert to (stationID, temperature)
val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
// Reduce by stationID retaining the minimum temperature found
val minTempsByStation = stationTemps.reduceByKey((x, y) => min(x, y))
// Collect, format, and print the results
val results = minTempsByStation.collect()
for (result <- results.sorted) {
val station = result._1
val temp = result._2
val formattedTemp = f"$temp%.2f F"
println(s"$station minimum temperature: $formattedTemp")
}
}
}
输出:
EZE00100082 minimum temperature: 7.70 F
ITE00100554 minimum temperature: 5.36 F
正如您在上面看到的,当我在地图中使用 命名函数 (parseLine) 时,它给出了错误 ,但是当我使用 [=27 时是同一个程序而不是命名函数=]映射中的匿名函数运行成功.
我查看了一些博客,但没有找到错误的原因。 谁能帮助我理解这一点?
这个问题似乎与 sbt 或依赖关系无关,正如我检查的那样,当脚本未定义为对象时会发生这种情况(Scala 对象默认是可序列化的)所以这个错误意味着脚本不是可序列化。 我创建了一个新对象并粘贴了相同的代码。成功了。