从 Spark 访问 Cassandra "com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.sql.cassandra.CassandraSQLRow"

Access Cassandra from Spark "com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.sql.cassandra.CassandraSQLRow"

我正在尝试 运行 spark 中的一个 scala 程序,该程序通过来自 datastax 的 Cassandra 连接器访问 Cassandra。

我收到以下错误

    15/04/30 17:43:44 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.sql.cassandra.CassandraSQLRow
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
    at org.apache.spark.serializer.DeserializationStream$$anon.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.Sort$$anonfun$execute$$anonfun$apply.apply(basicOperators.scala:209)
    at org.apache.spark.sql.execution.Sort$$anonfun$execute$$anonfun$apply.apply(basicOperators.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:618)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:618)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.CassandraSQLRow
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
    ... 48 more

我运行宁以下配置:

我认为这个问题可能与 Spark 没有正确加载连接器 JAR 有关,因此,我尝试了以下方法:

1) 将连接器 JAR 添加到 spark-env.sh

SPARK_CLASSPATH=/home/spark/jars/spark-cassandra-connector_2.10-1.2.0-rc3.jar

Spark 抱怨此设置已弃用。

2) 将连接器 JAR 添加到 spark-defaults.conf

spark.executor.extraClassPath /home/spark/jars/spark-cassandra-connector_2.10-1.2.0-rc3.jar

同样的问题

3) 使用 --driver-class-path

添加连接器 JAR

我遇到以下异常:

线程异常 "main" java.lang.NoClassDefFoundError: com/google/common/cache/CacheLoader

4) 在 运行ning spark-submit

时使用 --jars 选项添加连接器 JAR

同样的问题

当我在 IntelliJ 上 运行 它时程序工作正常,但是当我 assemble 它和 运行 使用 spark-submit 的胖 JAR 时,我收到之前显示的错误.

我认为这可能与以下问题有关:

[https://datastax-oss.atlassian.net/browse/SPARKC-23][1]

应该在连接器版本 1.1.2 中修复,但问题在我使用的版本 1.2.0-rc3 上重现

我的 build.sbt 看起来像这样:

scalaVersion := "2.10.4"

val sparkVersion = "1.2.2"  

val cassandraConnectorVersion = "1.2.0-rc3" 

libraryDependencies ++= {
  Seq(
    ("org.apache.spark" %% "spark-core" % sparkVersion).
       exclude("org.mortbay.jetty", "servlet-api").
       exclude("commons-beanutils", "commons-beanutils-core").
       exclude("commons-collections", "commons-collections").
       exclude("commons-logging", "commons-logging").
       exclude("com.esotericsoftware.minlog" , "minlog").
       exclude("org.apache.hadoop" , "hadoop-yarn-api").
       exclude("org.apache.hadoop" , "hadoop-yarn-common").
       exclude("org.slf4j" , "jcl-over-slf4j").
       exclude("javax.servlet" , "javax.servlet-api").
       exclude("org.eclipse.jetty.orbit" , "javax.servlet").
       exclude("org.eclipse.jetty.orbit" , "javax.activation").
       exclude("org.eclipse.jetty.orbit" , "javax.mail.glassfish").
       exclude("org.eclipse.jetty.orbit" , "javax.transaction"), // % "provided",
    "org.apache.spark" %% "spark-sql" % sparkVersion, // % "provided",
    "org.apache.spark" %% "spark-mllib" % sparkVersion, // % "provided",
    "com.datastax.spark" %% "spark-cassandra-connector" % cassandraConnectorVersion,
    "javax.servlet" % "javax.servlet-api" % "3.0.1",
    "org.mongodb" % "mongo-java-driver" % "2.12.4",
    "org.mongodb" % "casbah_2.10" % "2.8.0",
    "com.typesafe" % "config" % "1.2.1",
    "org.scalanlp" %% "breeze" % "0.10",
    "joda-time" % "joda-time" % "2.7",
    "org.rogach" %% "scallop" % "0.9.5",
    "org.apache.commons" % "commons-io" % "1.3.2",
    "com.google.code.gson" % "gson" % "2.3.1",
    "com.novus" %% "salat-core" % "1.9.9"
  )}

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

resolvers += "Sonatype OSS Snapshots" at "http://oss.sonatype.org/content/repositories/releases/"

更新:

我使用 Spark 1.1.1 和 Spark-Connector 1.1.1 进行了相同的尝试。我遇到了同样的问题。

看起来像这样的吉拉:

https://datastax-oss.atlassian.net/browse/SPARKC-23

已在连接器 1.1.2 中修复

上面提到的 Jira 只是添加了更多关于在何处包含 Jar 的文档。您的错误似乎源于期望 Spark 1.2 的 Spark 1.2.0-rc3 连接器与包含 Spark 1.1.0 的 DSE 4.6 之间的不兼容。

尝试使用 1.1.X 版本的连接器。


编辑

这似乎仍然遇到了 Spark 的一个不幸的 class-loader 问题。

我希望新的 Spark 能解决这个问题,但问题似乎仍然存在。错误的发生是因为 spark 执行器和 KryoSerializer 代码使用了不同的 class 加载器。解决方法是确保连接器 jar NOT 是你的 fat jar 的一部分,这样你就可以让系统 class 加载器加载整个库。相反,您手动将 jar 移动到所有执行程序,并使用 executorExtraClassPath 变量指定 classpath。

所以这里的关键是确保 Spark Cassandra Connector classes 不在 ExecutorURLClassloader 上,而是在系统 class-loader 上。

这是我 运行 在 spark 1.2.1

上使用 REPL 的示例
automaton@ubuntu:~/spark-1.2.1-bin-hadoop1$ 
./bin/spark-shell 
    --master spark://ubuntu:7077 --driver-class-path /home/automaton/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar --conf spark.executor.extraClassPath=/home/automaton/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar  -conf spark.cassandra.connection.host=127.0.0.1 

scala> import org.apache.spark.sql.cassandra.CassandraSQLContext 
import org.apache.spark.sql.cassandra.CassandraSQLContext 

scala> val cc = new CassandraSQLContext(sc) 
cc: org.apache.spark.sql.cassandra.CassandraSQLContext = org.apache.spark.sql.cassandra.CassandraSQLContext@3f8aef3e 

scala> cc.sql("SELECT * FROM test.fun as a JOIN test.fun as b ON (a.k = b.v)").collect 
res0: Array[org.apache.spark.sql.Row] = Array([31,31,31,31],  ..... 

注意我是如何使用 --driver-class-path 获取驱动程序 class-loader 上的 jar,然后使用 --conf spark.executor.extraClassPath 获取 jar到执行程序 JVM 的系统 class 加载器上。