SparkException:本地 class 不兼容
SparkException: local class incompatible
我正在尝试将 spark 作业从客户端提交到 cloudera 集群。在集群中,我们使用的是 CDH-5.3.2,它的 spark 版本是 1.2.0,hadoop 版本是 2.5.0。因此,为了测试我们的集群,我们提交了从 spark 网站获取的 wordcount 样本。我们可以成功提交我们写在 java 中的 spark 作业。但是,我们无法将结果写入 hdfs 上的文件。
我们收到以下错误,
20/06/25 09:38:16 INFO DAGScheduler: Job 0 failed: saveAsTextFile at SimpleWordCount.java:36, took 5.450531 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 8, obelix2): java.io.InvalidClassException: org.apache.spark.rdd.PairRDDFunctions; local class incompatible: stream classdesc serialVersionUID = 8789839749593513237, local class serialVersionUID = -4145741279224749316
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
这是我们的代码示例
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class SimpleWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext spark = new JavaSparkContext(conf);
JavaRDD<String> textFile = spark.textFile("hdfs://obelix1:8022/user/U079681/deneme/example.txt");
JavaRDD<String> words = textFile
.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words
.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs
.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
// System.out.println(counts.collect());
counts.saveAsTextFile("hdfs://obelix1:8022/user/U079681/deneme/result");
}
}
Maven 依赖项是
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0-cdh5.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.0-mr1-cdh5.3.2</version>
</dependency>
我完全不知道错误从何而来,因为据我了解,应用程序的 spark 版本和 cloudera 的 spark 版本是相同的。任何想法都将非常受欢迎。
注意:写入控制台即可看到结果。
如您的注释所述,当结果打印在控制台中时应用程序正常工作,但每当您尝试将它们保存在底层 HDFS.[=12 中时会出错=]
如果我没记错的话,这意味着:
将结果输出到控制台时,Spark 可能未使用底层 Hadoop 基础设施。
在 HDFS 中保存结果时,Spark 确实使用底层 Hadoop基础设施。
这些情况让我认为 Hadoop 版本不匹配发生在某处。尽管 Spark 版本可能在应用程序和集群节点中都匹配,但使用的 Hadoop 版本可能存在差异。
您应该查看 CDH-5.3.2
中使用的库,并检查它们是否都与您的应用程序中使用的库匹配。
另外,看看这个问题:
Standalone spark cluster. Can't submit job programmatically -> java.io.InvalidClassException
经过几个小时的努力,我们已经解决了这个问题。我们问题的根本原因是我们从官方网站下载了 apache-spark 并构建了它。所以有些 jar 不能与 cloudera 发行版竞争。今天终于了解到,spark cloudera distribution 在github(https://github.com/cloudera/spark/tree/cdh5-1.2.0_5.3.2) 中可用,构建后我们将作业结果保存到hdfs。
我正在尝试将 spark 作业从客户端提交到 cloudera 集群。在集群中,我们使用的是 CDH-5.3.2,它的 spark 版本是 1.2.0,hadoop 版本是 2.5.0。因此,为了测试我们的集群,我们提交了从 spark 网站获取的 wordcount 样本。我们可以成功提交我们写在 java 中的 spark 作业。但是,我们无法将结果写入 hdfs 上的文件。 我们收到以下错误,
20/06/25 09:38:16 INFO DAGScheduler: Job 0 failed: saveAsTextFile at SimpleWordCount.java:36, took 5.450531 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 8, obelix2): java.io.InvalidClassException: org.apache.spark.rdd.PairRDDFunctions; local class incompatible: stream classdesc serialVersionUID = 8789839749593513237, local class serialVersionUID = -4145741279224749316
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
这是我们的代码示例
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class SimpleWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext spark = new JavaSparkContext(conf);
JavaRDD<String> textFile = spark.textFile("hdfs://obelix1:8022/user/U079681/deneme/example.txt");
JavaRDD<String> words = textFile
.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words
.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs
.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
// System.out.println(counts.collect());
counts.saveAsTextFile("hdfs://obelix1:8022/user/U079681/deneme/result");
}
}
Maven 依赖项是
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0-cdh5.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.0-mr1-cdh5.3.2</version>
</dependency>
我完全不知道错误从何而来,因为据我了解,应用程序的 spark 版本和 cloudera 的 spark 版本是相同的。任何想法都将非常受欢迎。
注意:写入控制台即可看到结果。
如您的注释所述,当结果打印在控制台中时应用程序正常工作,但每当您尝试将它们保存在底层 HDFS.[=12 中时会出错=]
如果我没记错的话,这意味着:
将结果输出到控制台时,Spark 可能未使用底层 Hadoop 基础设施。
在 HDFS 中保存结果时,Spark 确实使用底层 Hadoop基础设施。
这些情况让我认为 Hadoop 版本不匹配发生在某处。尽管 Spark 版本可能在应用程序和集群节点中都匹配,但使用的 Hadoop 版本可能存在差异。
您应该查看 CDH-5.3.2
中使用的库,并检查它们是否都与您的应用程序中使用的库匹配。
另外,看看这个问题:
Standalone spark cluster. Can't submit job programmatically -> java.io.InvalidClassException
经过几个小时的努力,我们已经解决了这个问题。我们问题的根本原因是我们从官方网站下载了 apache-spark 并构建了它。所以有些 jar 不能与 cloudera 发行版竞争。今天终于了解到,spark cloudera distribution 在github(https://github.com/cloudera/spark/tree/cdh5-1.2.0_5.3.2) 中可用,构建后我们将作业结果保存到hdfs。