Spark Cassandra 连接器:实施 SCD 类型 1
Spark Cassandra Connector: Implement SCD Type 1
我是 Cassandra 的新手,我想在 Cassandra DB 中实现 SCD Type-1。
此 SCD Type1 作业将从 Spark 执行。
数据将存储为时间序列分区数据。即:Year/month/Day
示例:我有最近 300 天的记录,我的新记录可能有新记录,也可能有更新的记录。
我想比较最近 100 天的更新记录,如果记录是新的,那么它应该执行插入操作,否则更新。
我没有得到执行此操作的任何线索,因此不共享任何 CQL :(
示例 table 结构为:
CREATE TABLE crossfit_gyms_by_city_New (
country_code text,
state_province text,
city text,
gym_name text,
PRIMARY KEY ((country_code, state_province), gym_name)
) WITH CLUSTERING ORDER BY (gym_name ASC );
我的示例 Spark 代码:
object SparkUpdateCassandra {
System.setProperty("hadoop.home.dir", "C:\hadoop\")
def main(args: Array[String]): Unit = {
val spark = org.apache.spark.sql.SparkSession
.builder()
.master("local[*]")
.config("spark.cassandra.connection.host", "localhost")
.appName("Spark Cassandra Connector Example")
.getOrCreate()
import spark.implicits._
//Read Cassandra data using DataFrame
val FirstDF = Seq(("India", "WB", "Kolkata", "Cult Fit"),("India", "KA", "Bengaluru", "Cult Fit")).toDF("country_code", "state_province","city","gym_name")
FirstDF.show(10)
FirstDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf1 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf1.show(10)
// spark.implicits.wait(5000)
val SecondDF = Seq(("India", "WB", "Siliguri", "CultFit"),("India", "KA", "Bengaluru", "CultFit")).toDF("country_code", "state_province","city","gym_name")
SecondDF.show(10)
SecondDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf2 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf2.show(10)
}
}
注意:我在 Spark 框架中使用 Scala。
在Cassandra中,一切都是upsert - 如果行不存在,它会被插入,如果它存在,那么它会被更新,所以你只需要将你的数据放入RDD或DataFrame并使用Spark的相应功能Cassandra 连接器:
rdd.saveToCassandra("keyspace", "table")
或者只是 write
inDataFrame API:
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "table_name", "keyspace" -> "keyspace_name"))
.mode(SaveMode.Append)
.save()
为了实现这一点,有一些事实可以帮助您导航代码示例,您将 运行 进入
在以前的 Spark 1 代码中,我们将使用
1 个 SparkContext see docs
2 要连接到 Cassandra,请使用由 SparkContext
构建的 CassandraSQLContext
对于 Spark 2,这大部分发生了变化
创建一个 Spark 会话和一个 [1]
然后您将 运行 您的母语 SQL 与 [1]
中所示的会话
设置并运行后,您只需为 SCD 类型 1 操作执行适当的 sql,即可找到所涉及的 sql 中的 good examples。
我是 Cassandra 的新手,我想在 Cassandra DB 中实现 SCD Type-1。
此 SCD Type1 作业将从 Spark 执行。
数据将存储为时间序列分区数据。即:Year/month/Day
示例:我有最近 300 天的记录,我的新记录可能有新记录,也可能有更新的记录。 我想比较最近 100 天的更新记录,如果记录是新的,那么它应该执行插入操作,否则更新。
我没有得到执行此操作的任何线索,因此不共享任何 CQL :(
示例 table 结构为:
CREATE TABLE crossfit_gyms_by_city_New (
country_code text,
state_province text,
city text,
gym_name text,
PRIMARY KEY ((country_code, state_province), gym_name)
) WITH CLUSTERING ORDER BY (gym_name ASC );
我的示例 Spark 代码:
object SparkUpdateCassandra {
System.setProperty("hadoop.home.dir", "C:\hadoop\")
def main(args: Array[String]): Unit = {
val spark = org.apache.spark.sql.SparkSession
.builder()
.master("local[*]")
.config("spark.cassandra.connection.host", "localhost")
.appName("Spark Cassandra Connector Example")
.getOrCreate()
import spark.implicits._
//Read Cassandra data using DataFrame
val FirstDF = Seq(("India", "WB", "Kolkata", "Cult Fit"),("India", "KA", "Bengaluru", "Cult Fit")).toDF("country_code", "state_province","city","gym_name")
FirstDF.show(10)
FirstDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf1 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf1.show(10)
// spark.implicits.wait(5000)
val SecondDF = Seq(("India", "WB", "Siliguri", "CultFit"),("India", "KA", "Bengaluru", "CultFit")).toDF("country_code", "state_province","city","gym_name")
SecondDF.show(10)
SecondDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf2 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf2.show(10)
}
}
注意:我在 Spark 框架中使用 Scala。
在Cassandra中,一切都是upsert - 如果行不存在,它会被插入,如果它存在,那么它会被更新,所以你只需要将你的数据放入RDD或DataFrame并使用Spark的相应功能Cassandra 连接器:
rdd.saveToCassandra("keyspace", "table")
或者只是 write
inDataFrame API:
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "table_name", "keyspace" -> "keyspace_name"))
.mode(SaveMode.Append)
.save()
为了实现这一点,有一些事实可以帮助您导航代码示例,您将 运行 进入
在以前的 Spark 1 代码中,我们将使用
1 个 SparkContext see docs
2 要连接到 Cassandra,请使用由 SparkContext
对于 Spark 2,这大部分发生了变化
创建一个 Spark 会话和一个
然后您将 运行 您的母语 SQL 与 [1]
中所示的会话设置并运行后,您只需为 SCD 类型 1 操作执行适当的 sql,即可找到所涉及的 sql 中的 good examples。