Scala、Play2.4、Spark:由于 SparkConf 对象,任务不可序列化
Scala, Play2.4, Spark: Task Not Serializable due to SparkConf object
我将 Spark 与 Scala 和 Play2.4 一起使用。首先,我看到了这个线程:
Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
如果我们将一个方法传递给 Spark,它会尝试序列化整个 class,我同意这样的事实。
我的代码,会更清晰:
class GPSSparkServiceImpl @Inject() (val stepService: StepDbService, val coordinateService: CoordinateService) extends GPSSparkService with Serializable{
/**
* Spark config
* Set Master node and AppName
*/
val conf = new SparkConf().setAppName("Editus GPS").setMaster("local[2]")
/**
* Initialize Spark Context
*/
val sc = new SparkContext(conf)
override def execute() = {
val logData = sc.textFile("file://C://work/csv/gps.csv").cache()
val numAs = logData.filter(line => line.contains("a")).count()
println("Lines with a: %s".format(numAs))
}
override def generateUserToStep(): Unit = {
val futureSteps = stepService.findAll()
futureSteps onSuccess{
case steps =>
val data = sc.textFile("file://C://work/csv/gps.csv").cache()
val result = data.flatMap(line => steps.map(step => (line, step))).filter { tuple =>
coordinateService.checkProximity(
coordinateService.coordinateToDistanceInMeters(
tuple._1.split(";")(1).toDouble, tuple._1.split(";")(2).toDouble, tuple._2.gpsCoordinate.latitude, tuple._2.gpsCoordinate.longitude
), tuple._2
)
}.count()
println("result: " + result + " for " + steps.length + " steps")
}
}
}
如您所见,我在 spark 中使用了 2 种方法:来自 CoordinateService
的 checkProximity
和 CoordinateToDistanceInMeters
将作为 CoordinateServiceImpl
和此 class 注入是可序列化的。 class 中的所有对象都是可序列化的。
坐标服务实现:
class CoordinateServiceImpl @Inject() (val config: Configuration) extends CoordinateService with Serializable{
override def coordinateToDistanceInMeters(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = {
val earthRadius: Double = 6371000
val dLat: Double = Math.toRadians(lat2-lat1)
val dLng: Double = Math.toRadians(lng2-lng1)
val a: Double = Math.sin(dLat/2) * Math.sin(dLat/2) +
Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
Math.sin(dLng/2) * Math.sin(dLng/2)
val c: Double = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
earthRadius * c
}
override def checkProximity(distance: Double, step: Step): Boolean =
distance < step.acceptableProximity
}
为什么我遇到SparkConf not serializable,而实际是class?我什至没有使用它的任何方法。也许我错过了什么....
堆栈跟踪:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.filter(RDD.scala:303)
at services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep.applyOrElse(GPSSparkServiceImpl.scala:
41)
at services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep.applyOrElse(GPSSparkServiceImpl.scala:
38)
at scala.concurrent.Future$$anonfun$onSuccess.apply(Future.scala:117)
at scala.concurrent.Future$$anonfun$onSuccess.apply(Future.scala:115)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
- object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@1b0321ae)
- field (class: services.gps.spark.GPSSparkServiceImpl, name: conf, type: class org.apache.spark.SparkConf)
- object (class services.gps.spark.GPSSparkServiceImpl, services.gps.spark.GPSSparkServiceImpl@458c4049)
- field (class: services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep, name: $outer, type: clas
s services.gps.spark.GPSSparkServiceImpl)
- object (class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep, <function1>)
- field (class: services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$$anonfun, name: $outer,
type: class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep)
- object (class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 14 more
如果我使用对象而不是注入的对象 class,它会完美运行。
object GpsUtils{
def coordinateToDistanceInMeters(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = {
val earthRadius: Double = 6371000
val dLat: Double = Math.toRadians(lat2-lat1)
val dLng: Double = Math.toRadians(lng2-lng1)
val a: Double = Math.sin(dLat/2) * Math.sin(dLat/2) +
Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
Math.sin(dLng/2) * Math.sin(dLng/2)
val c: Double = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
earthRadius * c
}
def checkProximity(distance: Double, step: Step): Boolean =
distance < step.acceptableProximity
}
我将 Spark 与 Scala 和 Play2.4 一起使用。首先,我看到了这个线程: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
如果我们将一个方法传递给 Spark,它会尝试序列化整个 class,我同意这样的事实。
我的代码,会更清晰:
class GPSSparkServiceImpl @Inject() (val stepService: StepDbService, val coordinateService: CoordinateService) extends GPSSparkService with Serializable{
/**
* Spark config
* Set Master node and AppName
*/
val conf = new SparkConf().setAppName("Editus GPS").setMaster("local[2]")
/**
* Initialize Spark Context
*/
val sc = new SparkContext(conf)
override def execute() = {
val logData = sc.textFile("file://C://work/csv/gps.csv").cache()
val numAs = logData.filter(line => line.contains("a")).count()
println("Lines with a: %s".format(numAs))
}
override def generateUserToStep(): Unit = {
val futureSteps = stepService.findAll()
futureSteps onSuccess{
case steps =>
val data = sc.textFile("file://C://work/csv/gps.csv").cache()
val result = data.flatMap(line => steps.map(step => (line, step))).filter { tuple =>
coordinateService.checkProximity(
coordinateService.coordinateToDistanceInMeters(
tuple._1.split(";")(1).toDouble, tuple._1.split(";")(2).toDouble, tuple._2.gpsCoordinate.latitude, tuple._2.gpsCoordinate.longitude
), tuple._2
)
}.count()
println("result: " + result + " for " + steps.length + " steps")
}
}
}
如您所见,我在 spark 中使用了 2 种方法:来自 CoordinateService
的 checkProximity
和 CoordinateToDistanceInMeters
将作为 CoordinateServiceImpl
和此 class 注入是可序列化的。 class 中的所有对象都是可序列化的。
坐标服务实现:
class CoordinateServiceImpl @Inject() (val config: Configuration) extends CoordinateService with Serializable{
override def coordinateToDistanceInMeters(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = {
val earthRadius: Double = 6371000
val dLat: Double = Math.toRadians(lat2-lat1)
val dLng: Double = Math.toRadians(lng2-lng1)
val a: Double = Math.sin(dLat/2) * Math.sin(dLat/2) +
Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
Math.sin(dLng/2) * Math.sin(dLng/2)
val c: Double = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
earthRadius * c
}
override def checkProximity(distance: Double, step: Step): Boolean =
distance < step.acceptableProximity
}
为什么我遇到SparkConf not serializable,而实际是class?我什至没有使用它的任何方法。也许我错过了什么....
堆栈跟踪:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.filter(RDD.scala:303)
at services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep.applyOrElse(GPSSparkServiceImpl.scala:
41)
at services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep.applyOrElse(GPSSparkServiceImpl.scala:
38)
at scala.concurrent.Future$$anonfun$onSuccess.apply(Future.scala:117)
at scala.concurrent.Future$$anonfun$onSuccess.apply(Future.scala:115)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
- object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@1b0321ae)
- field (class: services.gps.spark.GPSSparkServiceImpl, name: conf, type: class org.apache.spark.SparkConf)
- object (class services.gps.spark.GPSSparkServiceImpl, services.gps.spark.GPSSparkServiceImpl@458c4049)
- field (class: services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep, name: $outer, type: clas
s services.gps.spark.GPSSparkServiceImpl)
- object (class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep, <function1>)
- field (class: services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$$anonfun, name: $outer,
type: class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep)
- object (class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 14 more
如果我使用对象而不是注入的对象 class,它会完美运行。
object GpsUtils{
def coordinateToDistanceInMeters(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = {
val earthRadius: Double = 6371000
val dLat: Double = Math.toRadians(lat2-lat1)
val dLng: Double = Math.toRadians(lng2-lng1)
val a: Double = Math.sin(dLat/2) * Math.sin(dLat/2) +
Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
Math.sin(dLng/2) * Math.sin(dLng/2)
val c: Double = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
earthRadius * c
}
def checkProximity(distance: Double, step: Step): Boolean =
distance < step.acceptableProximity
}