作业的 Spark Jobserver 单元测试
Spark Jobserver Unit Testing of jobs
我想为在 spark-jobserver 中执行的 spark 作业编写单元测试。
这工作正常,除非我需要访问配置,例如检查它的特定输入值,例如:
Try(config.getString("myKey"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No value for myKey config param"))
配置创建如下:
import com.typesafe.config.Config
val myConfig = ConfigFactory.parseString("key=value")
那么工作就是运行像:
MyJob.run(sqlCtx, myConfig))
这是例外情况:
ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$$anonfun$apply.apply(Executor.scala:440)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$$anonfun$apply.apply(Executor.scala:430)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat.apply(Executor.scala:430)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat.apply(Executor.scala:428)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply$mcV$sp(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply(Executor.scala:472)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.executor.Executor$$anon.run(Executor.scala:472)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 32 more
我可以用最新的 spark 1.6.1 确认这个问题已经消失。然而,工作似乎不受此异常的影响。
我想为在 spark-jobserver 中执行的 spark 作业编写单元测试。 这工作正常,除非我需要访问配置,例如检查它的特定输入值,例如:
Try(config.getString("myKey"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No value for myKey config param"))
配置创建如下:
import com.typesafe.config.Config
val myConfig = ConfigFactory.parseString("key=value")
那么工作就是运行像:
MyJob.run(sqlCtx, myConfig))
这是例外情况:
ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$$anonfun$apply.apply(Executor.scala:440)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$$anonfun$apply.apply(Executor.scala:430)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat.apply(Executor.scala:430)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat.apply(Executor.scala:428)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply$mcV$sp(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply(Executor.scala:472)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.executor.Executor$$anon.run(Executor.scala:472)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 32 more
我可以用最新的 spark 1.6.1 确认这个问题已经消失。然而,工作似乎不受此异常的影响。