如何使用 spark-cassandra-connector 连接 spark 和 cassandra?
How to connect spark with cassandra using spark-cassandra-connector?
你必须原谅我的笨拙,但我正在尝试设置一个连接到 cassandra 的 spark 集群 运行宁一个 python 脚本,目前我正在使用 datastax enterprise 运行 cassandra on solr 搜索模式。我知道,为了使用 datastax 提供的 spark-cassandra 连接器,您必须 运行 cassandra 处于分析模式(使用 -k 选项)。目前我只能使用 dse spark 版本让它工作,为此,为了让它工作我遵循了以下步骤:
- 以分析模式启动 dse cassandra
- 将 $PYTHONPATH 环境变量更改为 /path/to/spark/dse/python:/path/to/spark/dse/python/lib/py4j-*.zip:$PYTHONPATH
- 运行 作为 root 具有
python test-script.py
的独立脚本
此外,我单独使用 spark(不是 dse 版本)进行了另一次测试,试图包含使驱动程序 类 可访问的 java 包,我做到了:
- 添加spark.driver.extraClassPath = /path/to/spark-cassandra-connector-SNAPSHOT.jar到文件spark-defaults.conf
2.execute
$SPARK_HOME/bin/spark-submit —packages com.datastax.spark:spark-cassandra...
我还尝试了 运行ning pyspark shell 并测试 sc 是否有方法 cassandraTable 以查看驱动程序是否已加载但没有成功,在这两种情况下我都收到以下错误留言:
AttributeError: 'SparkContext' object has no attribute 'cassandraTable'
我的目标是了解我必须做什么才能使非 dse spark 版本与 cassandra 连接并使驱动程序中的方法可用。
我还想知道是否可以将 dse spark-cassandra 连接器与不是 运行 dse 的 cassandra 节点一起使用。
感谢您的帮助
以下是如何在非 dse 版本中将 spark-shell 连接到 cassandra。
将 spark-cassandra-connector
jar 复制到 spark/spark-hadoop-directory/jars/
spark-shell --jars ~/spark/spark-hadoop-directory/jars/spark-cassandra-connector-*.jar
在 spark shell 中执行这些命令
sc.stop
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val csc = new CassandraSQLContext(sc)
如果您的 cassandra 有密码设置等,您将需要提供更多参数。:)
我在独立 python 脚本中使用了 pyspark。我不使用 DSE,我从 datastax 的 github 存储库克隆了 cassandra-spark-connector 并使用 datastax instrucctions.
进行了编译
为了访问 spark 中的 spark 连接器,我复制到 spark 安装中的 jars 文件夹。
我认为这对你也有好处:
cp ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-86-ge36c048.jar $SPARK_HOME/jars/
您可以访问 this,我在其中解释了我自己设置环境的经验。
一旦 spark 可以访问 Cassandra 连接器,您就可以使用 pyspark 库作为包装器:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession.builder \
.appName('SparkCassandraApp') \
.config('spark.cassandra.connection.host', 'localhost') \
.config('spark.cassandra.connection.port', '9042') \
.config('spark.cassandra.output.consistency.level','ONE') \
.master('local[2]') \
.getOrCreate()
ds = sqlContext \
.read \
.format('org.apache.spark.sql.cassandra') \
.options(table='tablename', keyspace='keyspace_name') \
.load()
ds.show(10)
在此example您可以看到整个脚本。
你必须原谅我的笨拙,但我正在尝试设置一个连接到 cassandra 的 spark 集群 运行宁一个 python 脚本,目前我正在使用 datastax enterprise 运行 cassandra on solr 搜索模式。我知道,为了使用 datastax 提供的 spark-cassandra 连接器,您必须 运行 cassandra 处于分析模式(使用 -k 选项)。目前我只能使用 dse spark 版本让它工作,为此,为了让它工作我遵循了以下步骤:
- 以分析模式启动 dse cassandra
- 将 $PYTHONPATH 环境变量更改为 /path/to/spark/dse/python:/path/to/spark/dse/python/lib/py4j-*.zip:$PYTHONPATH
- 运行 作为 root 具有
python test-script.py
的独立脚本
此外,我单独使用 spark(不是 dse 版本)进行了另一次测试,试图包含使驱动程序 类 可访问的 java 包,我做到了:
- 添加spark.driver.extraClassPath = /path/to/spark-cassandra-connector-SNAPSHOT.jar到文件spark-defaults.conf
2.execute
$SPARK_HOME/bin/spark-submit —packages com.datastax.spark:spark-cassandra...
我还尝试了 运行ning pyspark shell 并测试 sc 是否有方法 cassandraTable 以查看驱动程序是否已加载但没有成功,在这两种情况下我都收到以下错误留言:
AttributeError: 'SparkContext' object has no attribute 'cassandraTable'
我的目标是了解我必须做什么才能使非 dse spark 版本与 cassandra 连接并使驱动程序中的方法可用。
我还想知道是否可以将 dse spark-cassandra 连接器与不是 运行 dse 的 cassandra 节点一起使用。
感谢您的帮助
以下是如何在非 dse 版本中将 spark-shell 连接到 cassandra。
将 spark-cassandra-connector
jar 复制到 spark/spark-hadoop-directory/jars/
spark-shell --jars ~/spark/spark-hadoop-directory/jars/spark-cassandra-connector-*.jar
在 spark shell 中执行这些命令
sc.stop
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val csc = new CassandraSQLContext(sc)
如果您的 cassandra 有密码设置等,您将需要提供更多参数。:)
我在独立 python 脚本中使用了 pyspark。我不使用 DSE,我从 datastax 的 github 存储库克隆了 cassandra-spark-connector 并使用 datastax instrucctions.
进行了编译为了访问 spark 中的 spark 连接器,我复制到 spark 安装中的 jars 文件夹。
我认为这对你也有好处:
cp ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-86-ge36c048.jar $SPARK_HOME/jars/
您可以访问 this,我在其中解释了我自己设置环境的经验。
一旦 spark 可以访问 Cassandra 连接器,您就可以使用 pyspark 库作为包装器:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession.builder \
.appName('SparkCassandraApp') \
.config('spark.cassandra.connection.host', 'localhost') \
.config('spark.cassandra.connection.port', '9042') \
.config('spark.cassandra.output.consistency.level','ONE') \
.master('local[2]') \
.getOrCreate()
ds = sqlContext \
.read \
.format('org.apache.spark.sql.cassandra') \
.options(table='tablename', keyspace='keyspace_name') \
.load()
ds.show(10)
在此example您可以看到整个脚本。