PySpark Cassandra 数据库连接问题

PySpark Cassandra Databese Connection Problem

我正在尝试将 cassandra 与 pyspark 一起使用。我可以正确地远程连接到 Spark Server。但是读cassandratable的阶段,我就麻烦了。我尝试了所有的 datastax 连接器,我更改了 Spark 配置(核心、内存等),但我无法完成。 (下面代码中的注释行是我的尝试。)

这是我的 python 代码;

import os
os.environ['JAVA_HOME']="C:\Program Files\Java\jdk1.8.0_271"
os.environ['HADOOP_HOME']="E:\etc\spark-3.0.1-bin-hadoop2.7"
os.environ['PYSPARK_DRIVER_PYTHON']="/usr/local/bin/python3.7"
os.environ['PYSPARK_PYTHON']="/usr/local/bin/python3.7"

# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=XX.XX.XX.XX spark.cassandra.auth.username=username spark.cassandra.auth.password=passwd pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars .ivy2\jars\spark-cassandra-connector-driver_2.12-3.0.0-alpha2.jar pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-alpha2 pyspark-shell'

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("spark://YY.YY.YY:7077").setAppName("My app")
conf.set("spark.shuffle.service.enabled", "false")
conf.set("spark.dynamicAllocation.enabled","false")
conf.set("spark.executor.cores", "2")
conf.set("spark.executor.memory", "5g")
conf.set("spark.executor.instances", "1")
conf.set("spark.jars", "C:\Users\verianalizi\.ivy2\jars\spark-cassandra-connector_2.12-3.0.0-beta.jar")

conf.set("spark.cassandra.connection.host","XX.XX.XX.XX")
conf.set("spark.cassandra.auth.username","username")
conf.set("spark.cassandra.auth.password","passwd")
conf.set("spark.cassandra.connection.port", "9042")
# conf.set("spark.sql.catalog.myCatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")

sc = SparkContext(conf=conf)
# sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

# It works well until now

def load_and_get_table_df(keys_space_name, table_name):
    table_df = sqlContext.read\
        .format("org.apache.spark.sql.cassandra")\
        .option("keyspace",keys_space_name)\
        .option("table",table_name)\
        .load()
    return table_df

movies = load_and_get_table_df("weather", "currentweatherconditions")

我得到的错误是;

有人知道吗?

发生这种情况是因为您仅指定 spark.jars 属性,并指向单个 jar。但是 spark cassandra 连接器取决于未包含在该列表中的其他 jar 的数量。我建议改为使用 spark.jars.packages 和坐标 com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,或者在 spark.jars 中指定具有所有必要依赖项的 assembly jar 的路径。

顺便说一句,3.0 是几个月前发布的 - 为什么你还在使用测试版?