使用 Spark 将数据(RDD)持久化到 Cassandra 时出现 ClassNotFound 异常

ClassNotFound Exception while persisting data (RDD) to Cassandra using Spark

我正在尝试使用以下代码使用 Cassandra 连接器将几行保存到 Cassandra。我正在使用数据 stax Cassandra Spark 连接器

SparkConf conf = new SparkConf().setAppName( "My application").setMaster("spark://IP:PORT").set("spark.cassandra.connection.host", "IP");
JavaSparkContext sc = new JavaSparkContext(conf);

ArrayList<MyClass> logs = new ArrayList<MyClass>();
for(int i=0; i<10; i++){
    logs.add(getLogs(i));
}

JavaRDD<MyClass> rdd = sc.parallelize(logs);
javaFunctions(rdd).writerBuilder("keyspace", "columnspace", mapToRow(MyClass.class)).saveToCassandra();

但我不断 Class 未找到异常。 (找不到我的Class)

我得到的异常是

15/01/27 13:50:31 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ibm4-blade7-vm3, PROCESS_LOCAL, 2552 bytes)
15/01/27 13:50:31 INFO BlockManagerMasterActor: Registering block manager ibm4-blade7-vm3:60273 with 265.4 MB RAM, BlockManagerId(0, ibm4-blade7-vm3, 60273)
15/01/27 13:50:31 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ibm4-blade7-vm3): java.io.IOException: java.lang.ClassNotFoundException: MyClass
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: MyClass
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:274)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon.resolveClass(JavaSerializer.scala:59)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject.apply$mcV$sp(ParallelCollectionRDD.scala:74)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
    ... 20 more

15/01/27 13:50:31 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on executor ibm4-blade7-vm3: java.io.IOException (java.lang.ClassNotFoundException: MyClass) [duplicate 1]
15/01/27 13:50:31 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 2, ibm4-blade7-vm3, PROCESS_LOCAL, 2552 bytes)
15/01/27 13:50:31 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 3, ibm4-blade7-vm3, PROCESS_LOCAL, 2543 bytes)
15/01/27 13:50:31 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 2) on executor ibm4-blade7-vm3: java.io.IOException (java.lang.ClassNotFoundException: MyClass) [duplicate 2]
15/01/27 13:50:31 INFO TaskSetManager: Starting task 1.2 in stage 0.0 (TID 4, ibm4-blade7-vm3, PROCESS_LOCAL, 2552 bytes)
15/01/27 13:50:31 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 3) on executor ibm4-blade7-vm3: java.io.IOException (java.lang.ClassNotFoundException: MyClass) [duplicate 3]
15/01/27 13:50:31 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 5, ibm4-blade7-vm3, PROCESS_LOCAL, 2543 bytes)
15/01/27 13:50:31 INFO TaskSetManager: Lost task 1.2 in stage 0.0 (TID 4) on executor ibm4-blade7-vm3: java.io.IOException (java.lang.ClassNotFoundException: MyClass) [duplicate 4]
15/01/27 13:50:31 INFO TaskSetManager: Starting task 1.3 in stage 0.0 (TID 6, ibm4-blade7-vm3, PROCESS_LOCAL, 2552 bytes)
15/01/27 13:50:31 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 5) on executor ibm4-blade7-vm3: java.io.IOException (java.lang.ClassNotFoundException: MyClass) [duplicate 5]
15/01/27 13:50:31 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 7, ibm4-blade7-vm3, PROCESS_LOCAL, 2543 bytes)
15/01/27 13:50:31 INFO TaskSetManager: Lost task 1.3 in stage 0.0 (TID 6) on executor ibm4-blade7-vm3: java.io.IOException (java.lang.ClassNotFoundException: MyClass) [duplicate 6]
15/01/27 13:50:31 ERROR TaskSetManager: Task 1 in stage 0.0 failed 4 times; aborting job
15/01/27 13:50:31 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) on executor ibm4-blade7-vm3: java.io.IOException (java.lang.ClassNotFoundException: MyClass) [duplicate 7]
15/01/27 13:50:31 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/01/27 13:50:31 INFO TaskSchedulerImpl: Cancelling stage 0
15/01/27 13:50:31 INFO DAGScheduler: Job 0 failed: runJob at RDDFunctions.scala:24, took 6.007777 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ibm4-blade7-vm3): java.io.IOException: java.lang.ClassNotFoundException: MyClass
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: MyClass
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:274)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon.resolveClass(JavaSerializer.scala:59)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject.apply$mcV$sp(ParallelCollectionRDD.scala:74)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
    ... 20 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我应该将我的应用程序 jar 添加到 spark class 路径并从 eclipse 运行 它吗?

您需要将作业的 jar 添加到 spark 配置中的 jar 列表中:

conf.setJars(Seq["/path/to/jobJar.jar","/path/to/otherdep.jar"])

或者构建一个 überjar 并将其作为单个 jar 提供。

另请参阅使用 spark-submit

提交申请