Cassandra 的 Spark2 会话,sql 个查询
Spark2 session for Cassandra , sql queries
在 Spark-2.0 中,创建 Spark 会话的最佳方式是什么。因为在 Spark-2.0 和 Cassandra 中,API 已经过重新设计,基本上弃用了 SqlContext(以及 CassandraSqlContext)。因此,为了执行 SQL- 要么我创建一个 Cassandra 会话 (com.datastax.driver.core.Session) and use execute( " ")
。或者我必须创建一个 SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)
方法。
我不知道两者的 SQL 局限性 - 谁能解释一下。
此外,如果我必须创建 SparkSession - 我该怎么做 - 找不到任何合适的示例。随着 API 的重新设计,旧示例不再适用。
我正在浏览这个代码示例- DataFrames- 不清楚这里使用的是什么 sql 上下文(这是正确的方法吗?)
(出于某种原因,已弃用的 API 甚至无法编译 - 需要检查我的 eclipse 设置)
谢谢
您需要来自 Cassandra DB 的 create/drop 键空间和 table 的 Cassandra 会话。在 Spark 应用程序中,为了创建 Cassandra Session,您需要将 SparkConf 传递给 CassandraConnector。在 Spark 2.0 中你可以像下面那样做。
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");
如果您有现有的 Dataframe,那么您也可以使用 DataFrameFunctions.createCassandraTable(Df)
在 Cassandra 中创建 table。请参阅 api 详细信息 here。
您可以使用 spark-cassandra-connector 提供的 api 从 Cassandra DB 读取数据,如下所示。
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
dataset.show();
您可以使用 SparkSession.sql() 方法 运行 查询在由 spark cassandra 连接器返回的 Dataframe 上创建的临时 table,如下所示。
dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();
在 Spark-2.0 中,创建 Spark 会话的最佳方式是什么。因为在 Spark-2.0 和 Cassandra 中,API 已经过重新设计,基本上弃用了 SqlContext(以及 CassandraSqlContext)。因此,为了执行 SQL- 要么我创建一个 Cassandra 会话 (com.datastax.driver.core.Session) and use execute( " ")
。或者我必须创建一个 SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)
方法。
我不知道两者的 SQL 局限性 - 谁能解释一下。
此外,如果我必须创建 SparkSession - 我该怎么做 - 找不到任何合适的示例。随着 API 的重新设计,旧示例不再适用。 我正在浏览这个代码示例- DataFrames- 不清楚这里使用的是什么 sql 上下文(这是正确的方法吗?) (出于某种原因,已弃用的 API 甚至无法编译 - 需要检查我的 eclipse 设置)
谢谢
您需要来自 Cassandra DB 的 create/drop 键空间和 table 的 Cassandra 会话。在 Spark 应用程序中,为了创建 Cassandra Session,您需要将 SparkConf 传递给 CassandraConnector。在 Spark 2.0 中你可以像下面那样做。
SparkSession spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[2]")
.getOrCreate();
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());
Session session = connector.openSession();
session.execute("CREATE TABLE mykeyspace.mytable(id UUID PRIMARY KEY, username TEXT, email TEXT)");
如果您有现有的 Dataframe,那么您也可以使用 DataFrameFunctions.createCassandraTable(Df)
在 Cassandra 中创建 table。请参阅 api 详细信息 here。
您可以使用 spark-cassandra-connector 提供的 api 从 Cassandra DB 读取数据,如下所示。
Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mykeyspace");
put("table", "mytable");
}
}).load();
dataset.show();
您可以使用 SparkSession.sql() 方法 运行 查询在由 spark cassandra 连接器返回的 Dataframe 上创建的临时 table,如下所示。
dataset.createOrReplaceTempView("usertable");
Dataset<Row> dataset1 = spark.sql("select * from usertable where username = 'Mat'");
dataset1.show();