无法使用 cassandra 连接器在 apache spark 2.0.2 上初始化 class com.datastax.spark.connector.types.TypeConverter$ 而 运行 作业

Could not initialize class com.datastax.spark.connector.types.TypeConverter$ while running job on apache spark 2.0.2 using cassandra connector

我正在尝试 运行 对来自 apache spark shell 的数据集进行简单计数,这些数据集之前已提取到我的 cassandra 集群。为此,我创建了创建 fat jar 的简单 maven 项目,有我的依赖项:

        <!-- https://mvnrepository.com/artifact/com.cloudera.sparkts/sparkts -->
    <dependency>
        <groupId>com.cloudera.sparkts</groupId>
        <artifactId>sparkts</artifactId>
        <version>0.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10 -->
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>2.0.0-M3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.0.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core -->
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>

我运行使用 spark shell 使用这个命令 运行nig 这个罐子:

spark-shell --jars Sensors-1.0-SNAPSHOT-jar-with-dependencies.jar --executor-memory 512M

加载所需的依赖项后,我正在尝试 运行 在我的 spark 实例上进行给定操作:

import pl.agh.edu.kis.sensors._
import com.datastax.spark.connector._
val test = new TestConnector(sc)
test.count()

这是我收到的错误:

    17/01/21 04:42:37 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.io.IOException: Exception during preparation of SELECT "coil_id", "event_time", "car_count", "insert_time" FROM "public"."traffic" WHERE token("coil_id") > ? AND token("coil_id") <= ?   ALLOW FILTERING: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:307)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:335)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:335)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
    at com.datastax.spark.connector.types.TypeConverter$.<init>(TypeConverter.scala:116)
    at com.datastax.spark.connector.types.TypeConverter$.<clinit>(TypeConverter.scala)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
    at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:229)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:282)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:282)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:282)
    ... 17 more
17/01/21 04:42:41 INFO CoarseGrainedExecutorBackend: Got assigned task 2
17/01/21 04:42:41 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/01/21 04:42:41 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.io.IOException: Exception during preparation of SELECT "coil_id", "event_time", "car_count", "insert_time" FROM "public"."traffic" WHERE token("coil_id") > ? AND token("coil_id") <= ?   ALLOW FILTERING: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:307)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:335)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:335)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
    at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:229)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:282)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:282)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:282)
    ... 17 more
17/01/21 04:42:45 INFO CoarseGrainedExecutorBackend: Got assigned task 3
17/01/21 04:42:45 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/01/21 04:42:45 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.io.IOException: Exception during preparation of SELECT "coil_id", "event_time", "car_count", "insert_time" FROM "public"."traffic" WHERE token("coil_id") > ? AND token("coil_id") <= ?   ALLOW FILTERING: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:307)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:335)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:335)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
    at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:229)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:282)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun.apply(CassandraTableScanRDD.scala:282)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:282)
    ... 17 more

这是我的代码:

import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

/**
 * Created by Daniel on 20.01.2017.
 */
public class TestConnector {

    SparkContext sc;

    public TestConnector(SparkContext context){
        sc = context;
        sc.conf().set("spark.cassandra.connection.host","10.156.207.84")
                .set("spark.cores.max","1");
    }

    public TestConnector(){
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host","10.156.207.84")
                .set("spark.cores.max","1");
        sc = new SparkContext(conf);
    }

    public void count(){
        CassandraTableScanJavaRDD rdd  = javaFunctions(sc).cassandraTable("public","traffic");
        System.out.println("Total readings: " + rdd.count());
    }

}

Scala 版本:2.11.8, 火花版本:2.0.2, 卡桑德拉版本:3.9, Java版本:1.8.0_111

您运行遇到的第一个问题看起来像是 Scala 版本不匹配。 Spark 2.0 的默认安装使用 Scala 2.11,但您已为所有依赖项指定了 2.10。将所有 _2.10 更改为 _2.11

接下来您将 运行 遇到 Guava 不匹配问题,因为您在不应该的时候包含了 Cassandra 驱动程序。所以删除对 Java 驱动程序的依赖。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md#how-do-i-fix-guava-classpath-errors

只需确保您的 build.sbt 文件是这样的:

name := "sink-to-aggs"

version := "0.1"

scalaVersion := "2.11.10"

libraryDependencies += "com.typesafe" % "config" % "1.3.3"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.0.0"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.3"
libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
  "org.slf4j" % "slf4j-simple" % "1.7.5")
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.5"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0"

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

spark cassandra datastax 驱动程序仅适用于 2.0.0 spark