Spark Cassandra Connector Error: java.lang.NoClassDefFoundError: com/datastax/spark/connector/TableRef

Spark Cassandra Connector Error: java.lang.NoClassDefFoundError: com/datastax/spark/connector/TableRef

Spark version:3.00
scala:2.12
Cassandra::3.11.4
spark-cassandra-connector_2.12-3.0.0-alpha2.jar

我没有使用 DSE。下面是我将数据帧写入我的 Cassandra 数据库的测试代码。

        spark = SparkSession \
        .builder \
        .config("spark.jars","spark-streaming-kafka-0-10_2.12-3.0.0.jar,spark-sql-kafka-0-10_2.12-3.0.0.jar,kafka-clients-2.5.0.jar,commons-pool2-2.8.0.jar,spark-token-provider-kafka-0-10_2.12-3.0.0.jar,**spark-cassandra-connector_2.12-3.0.0-alpha2.jar**") \
        .config("spark.cassandra.connection.host", "127.0.0.1")\
        .config('spark.cassandra.output.consistency.level', 'ONE')\
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()

    streamingInputDF = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "192.168.56.1:9092") \
        .option("subscribe", "def") \
        .load()
##Dataset operations

    def write_to_cassandra(streaming_df,E):
    streaming_df\
        .write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="a", keyspace="abc") \
         .save()
       
    q1 =sites_flat.writeStream \
    .outputMode('update') \
    .foreachBatch(write_to_cassandra) \
    .start()
q1.awaitTermination()

我可以对数据框执行一些操作并将其打印到控制台,但我无法保存甚至无法从我的 Cassandra 数据库中读取它。我得到的错误是:

      File "C:\opt\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o70.load.
: java.lang.NoClassDefFoundError: com/datastax/spark/connector/TableRef
    at org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOptions(DefaultSource.scala:142)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:268)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)

我尝试使用其他 cassandra 连接器版本 (2.5),但出现相同的错误 请帮忙!!!

问题是您使用的 spark.jars 选项仅将提供的 jar 包含到 class 路径中。但是 TableRef 案例 class 在 spark-cassandra-connector-driver 包中,它是 spark-cassandra-connector 的依赖项。要解决此问题,最好使用 --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-alpha2 启动 pysparkspark-submit(对于 kafka 支持也是如此)——在这种情况下,Spark 将获取所有必要的依赖项并将它们放入 class路径。

P.S。使用 alpha2 版本,您可能会在获取某些依赖项时遇到问题,例如 ffigroovy 等 - 这是一个已知错误(主要在 Spark 中):SPARKC-599,这已经是固定的,我们希望很快就能获得测试版。

更新 (14.03.2021):最好使用包含所有必要依赖项的 assembly version of SCC

P.P.S。从 Spark Structured Streaming 写入 Cassandra,不要使用 foreachbatch,只需用作普通数据接收器:

     val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .format("org.apache.spark.sql.cassandra")
      .option("checkpointLocation", "webhdfs://192.168.0.10:5598/checkpoint")
      .option("keyspace", "test")
      .option("table", "sttest_tweets")
      .start()

我运行遇到了同样的问题,试试看:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>2.4.3</version>
</dependency>

推测是版本兼容性问题