如何使用 spark-cassandra-connector 连接 spark 和 cassandra?

How to connect spark with cassandra using spark-cassandra-connector?

你必须原谅我的笨拙,但我正在尝试设置一个连接到 cassandra 的 spark 集群 运行宁一个 python 脚本,目前我正在使用 datastax enterprise 运行 cassandra on solr 搜索模式。我知道,为了使用 datastax 提供的 spark-cassandra 连接器,您必须 运行 cassandra 处于分析模式(使用 -k 选项)。目前我只能使用 dse spark 版本让它工作,为此,为了让它工作我遵循了以下步骤:

  1. 以分析模式启动 dse cassandra
  2. 将 $PYTHONPATH 环境变量更改为 /path/to/spark/dse/python:/path/to/spark/dse/python/lib/py4j-*.zip:$PYTHONPATH
  3. 运行 作为 root 具有 python test-script.py
  4. 的独立脚本

此外,我单独使用 spark(不是 dse 版本)进行了另一次测试,试图包含使驱动程序 类 可访问的 java 包,我做到了:

  1. 添加spark.driver.extraClassPath = /path/to/spark-cassandra-connector-SNAPSHOT.jar到文件spark-defaults.conf 2.execute $SPARK_HOME/bin/spark-submit —packages com.datastax.spark:spark-cassandra...

我还尝试了 运行ning pyspark shell 并测试 sc 是否有方法 cassandraTable 以查看驱动程序是否已加载但没有成功,在这两种情况下我都收到以下错误留言:

AttributeError: 'SparkContext' object has no attribute 'cassandraTable'

我的目标是了解我必须做什么才能使非 dse spark 版本与 cassandra 连接并使驱动程序中的方法可用。

我还想知道是否可以将 dse spark-cassandra 连接器与不是 运行 dse 的 cassandra 节点一起使用。

感谢您的帮助

以下是如何在非 dse 版本中将 spark-shell 连接到 cassandra。

spark-cassandra-connector jar 复制到 spark/spark-hadoop-directory/jars/

spark-shell --jars ~/spark/spark-hadoop-directory/jars/spark-cassandra-connector-*.jar

在 spark shell 中执行这些命令

sc.stop
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import  org.apache.spark.sql.cassandra._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val csc = new CassandraSQLContext(sc)

如果您的 cassandra 有密码设置等,您将需要提供更多参数。:)

我在独立 python 脚本中使用了 pyspark。我不使用 DSE,我从 datastax 的 github 存储库克隆了 cassandra-spark-connector 并使用 datastax instrucctions.

进行了编译

为了访问 spark 中的 spark 连接器,我复制到 spark 安装中的 jars 文件夹。

我认为这对你也有好处:

 cp ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-86-ge36c048.jar $SPARK_HOME/jars/

您可以访问 this,我在其中解释了我自己设置环境的经验。

一旦 spark 可以访问 Cassandra 连接器,您就可以使用 pyspark 库作为包装器:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder \
  .appName('SparkCassandraApp') \
  .config('spark.cassandra.connection.host', 'localhost') \
  .config('spark.cassandra.connection.port', '9042') \
  .config('spark.cassandra.output.consistency.level','ONE') \
  .master('local[2]') \
  .getOrCreate()

ds = sqlContext \
  .read \
  .format('org.apache.spark.sql.cassandra') \
  .options(table='tablename', keyspace='keyspace_name') \
  .load()

ds.show(10)

在此example您可以看到整个脚本。