使用 pyspark 从 cassandra 获取数据时出错

Error while fetching data from cassandra using pyspark

我是 apache spark 的新手,我只需要从 cassandra 数据库中获取一个 table,下面我附加了数据来调试情况,请帮助并提前感谢。 卡桑德拉 Node:192.168.56.10 星火节点:192.168.56.10

要获取的 Cassandra Table:dev.device {keyspace.table_name}

通过连接到 cassandra 访问 pyspark:

[root@spark ~]# pyspark --packages  com.datastax.spark:spark-cassandra-connector_2.11:3.0-alpha --conf spark.cassandra.connection.host=192.168.56.10
Python 3.6.8 (default, Nov  9 2021, 14:44:26) 
[GCC 8.5.0 20210514 (Red Hat 8.5.0-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/opt/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4c386c5c-716c-4c28-bc76-ae5870833da2;1.0
        confs: [default]
        found com.datastax.spark#spark-cassandra-connector_2.11;3.0-alpha in central
        found com.datastax.spark#spark-cassandra-connector-driver_2.11;3.0-alpha in central
        found com.datastax.oss#java-driver-core-shaded;4.5.0 in central
        found com.datastax.oss#native-protocol;1.4.9 in central
        found com.datastax.oss#java-driver-shaded-guava;25.1-jre in central
        found com.typesafe#config;1.3.4 in central
        found com.github.jnr#jnr-ffi;2.1.10 in central
        found com.github.jnr#jffi;1.2.19 in central
        found org.ow2.asm#asm;7.1 in central
        found org.ow2.asm#asm-commons;7.1 in central
        found org.ow2.asm#asm-tree;7.1 in central
        found org.ow2.asm#asm-analysis;7.1 in central
        found org.ow2.asm#asm-util;7.1 in central
        found com.github.jnr#jnr-a64asm;1.0.0 in central
        found com.github.jnr#jnr-x86asm;1.0.2 in central
        found com.github.jnr#jnr-posix;3.0.50 in central
        found com.github.jnr#jnr-constants;0.9.12 in central
        found org.slf4j#slf4j-api;1.7.26 in central
        found io.dropwizard.metrics#metrics-core;4.0.5 in central
        found org.hdrhistogram#HdrHistogram;2.1.11 in central
        found org.apache.tinkerpop#gremlin-core;3.4.5 in central
        found org.apache.tinkerpop#gremlin-shaded;3.4.5 in central
        found commons-configuration#commons-configuration;1.10 in central
        found commons-lang#commons-lang;2.6 in central
        found commons-collections#commons-collections;3.2.2 in central
        found org.yaml#snakeyaml;1.15 in central
        found org.javatuples#javatuples;1.2 in central
        found com.carrotsearch#hppc;0.7.1 in central
        found com.jcabi#jcabi-manifests;1.1 in central
        found com.jcabi#jcabi-log;0.14 in central
        found com.squareup#javapoet;1.11.1 in central
        found net.objecthunter#exp4j;0.4.8 in central
        found org.slf4j#jcl-over-slf4j;1.7.25 in central
        found org.apache.tinkerpop#gremlin-driver;3.4.5 in central
        found org.codehaus.groovy#groovy;2.5.7 in central
        found org.codehaus.groovy#groovy-json;2.5.7 in central
        found org.apache.tinkerpop#tinkergraph-gremlin;3.4.5 in central
        found org.reactivestreams#reactive-streams;1.0.2 in central
        found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
        found com.github.spotbugs#spotbugs-annotations;3.1.12 in central
        found com.google.code.findbugs#jsr305;3.0.2 in central
        found com.datastax.oss#java-driver-mapper-runtime;4.5.0 in central
        found com.datastax.oss#java-driver-query-builder;4.5.0 in central
        found org.apache.commons#commons-lang3;3.5 in central
        found com.thoughtworks.paranamer#paranamer;2.8 in central
        found com.typesafe.scala-logging#scala-logging_2.11;3.5.0 in central
        found org.scala-lang#scala-reflect;2.11.12 in central
:: resolution report :: resolve 1099ms :: artifacts dl 30ms
        :: modules in use:
        com.carrotsearch#hppc;0.7.1 from central in [default]
        com.datastax.oss#java-driver-core-shaded;4.5.0 from central in [default]
        com.datastax.oss#java-driver-mapper-runtime;4.5.0 from central in [default]
        com.datastax.oss#java-driver-query-builder;4.5.0 from central in [default]
        com.datastax.oss#java-driver-shaded-guava;25.1-jre from central in [default]
        com.datastax.oss#native-protocol;1.4.9 from central in [default]
        com.datastax.spark#spark-cassandra-connector-driver_2.11;3.0-alpha from central in [default]
        com.datastax.spark#spark-cassandra-connector_2.11;3.0-alpha from central in [default]
        com.github.jnr#jffi;1.2.19 from central in [default]
        com.github.jnr#jnr-a64asm;1.0.0 from central in [default]
        com.github.jnr#jnr-constants;0.9.12 from central in [default]
        com.github.jnr#jnr-ffi;2.1.10 from central in [default]
        com.github.jnr#jnr-posix;3.0.50 from central in [default]
        com.github.jnr#jnr-x86asm;1.0.2 from central in [default]
        com.github.spotbugs#spotbugs-annotations;3.1.12 from central in [default]
        com.github.stephenc.jcip#jcip-annotations;1.0-1 from central in [default]
        com.google.code.findbugs#jsr305;3.0.2 from central in [default]
        com.jcabi#jcabi-log;0.14 from central in [default]
        com.jcabi#jcabi-manifests;1.1 from central in [default]
        com.squareup#javapoet;1.11.1 from central in [default]
        com.thoughtworks.paranamer#paranamer;2.8 from central in [default]
        com.typesafe#config;1.3.4 from central in [default]
        com.typesafe.scala-logging#scala-logging_2.11;3.5.0 from central in [default]
        commons-collections#commons-collections;3.2.2 from central in [default]
        commons-configuration#commons-configuration;1.10 from central in [default]
        commons-lang#commons-lang;2.6 from central in [default]
        io.dropwizard.metrics#metrics-core;4.0.5 from central in [default]
        net.objecthunter#exp4j;0.4.8 from central in [default]
        org.apache.commons#commons-lang3;3.5 from central in [default]
        org.apache.tinkerpop#gremlin-core;3.4.5 from central in [default]
        org.apache.tinkerpop#gremlin-driver;3.4.5 from central in [default]
        org.apache.tinkerpop#gremlin-shaded;3.4.5 from central in [default]
        org.apache.tinkerpop#tinkergraph-gremlin;3.4.5 from central in [default]
        org.codehaus.groovy#groovy;2.5.7 from central in [default]
        org.codehaus.groovy#groovy-json;2.5.7 from central in [default]
        org.hdrhistogram#HdrHistogram;2.1.11 from central in [default]
        org.javatuples#javatuples;1.2 from central in [default]
        org.ow2.asm#asm;7.1 from central in [default]
        org.ow2.asm#asm-analysis;7.1 from central in [default]
        org.ow2.asm#asm-commons;7.1 from central in [default]
        org.ow2.asm#asm-tree;7.1 from central in [default]
        org.ow2.asm#asm-util;7.1 from central in [default]
        org.reactivestreams#reactive-streams;1.0.2 from central in [default]
        org.scala-lang#scala-reflect;2.11.12 from central in [default]
        org.slf4j#jcl-over-slf4j;1.7.25 from central in [default]
        org.slf4j#slf4j-api;1.7.26 from central in [default]
        org.yaml#snakeyaml;1.15 from central in [default]
        :: evicted modules:
        org.apache.commons#commons-lang3;3.8.1 by [org.apache.commons#commons-lang3;3.5] in [default]
        org.scala-lang#scala-reflect;2.11.8 by [org.scala-lang#scala-reflect;2.11.12] in [default]
        org.slf4j#slf4j-api;1.7.21 by [org.slf4j#slf4j-api;1.7.26] in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   50  |   0   |   0   |   3   ||   47  |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-4c386c5c-716c-4c28-bc76-ae5870833da2
        confs: [default]
        0 artifacts copied, 47 already retrieved (0kB/32ms)
21/12/27 14:11:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/27 14:11:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
/opt/spark-3.2.0-bin-hadoop3.2/python/pyspark/context.py:238: FutureWarning: Python 3.6 support is deprecated in Spark 3.2.
  FutureWarning
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/

Using Python version 3.6.8 (default, Nov  9 2021 14:44:26)
Spark context Web UI available at http://spark.localdomain:4041
Spark context available as 'sc' (master = spark://spark:7077, app id = app-20211227141113-0011).
SparkSession available as 'spark'.
>>> 

访问cassandra的代码table和错误:

>>> from pyspark.sql import SQLContext
>>> load_options = { "table": "device", "keyspace": "dev"}
>>> df=spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py", line 164, in load
    return self._df(self._jreader.load())
  File "/opt/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1310, in __call__
  File "/opt/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o44.load.
: java.lang.NoClassDefFoundError: scala/Product$class
        at com.datastax.spark.connector.TableRef.<init>(TableRef.scala:4)
        at org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOptions(DefaultSource.scala:142)
        at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:245)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 21 more

>>> df.show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'df' is not defined
>>> 

谁能指出这里有什么问题吗?

您不能将为 Scala 2.11 编译的连接器与使用 Scala 2.12 编译的 Spark 3.2.0 一起使用。您需要使用适当的版本 - right now it's 3.1.0 坐标 com.datastax.spark:spark-cassandra-connector_2.12:3.1.0

P.S。请注意,虽然基本功能可以使用,但在修复 SPARKC-670 之前更高级的功能将无法使用(参见 this PR