Datastax spark cassandra 连接器 - 将 DF 写入 cassandra table
Datastax spark cassandra connector - writing DF to cassandra table
我们最近开始使用 Scala、Spark 和 Cassandra 进行大数据项目,我对所有这些技术都是新手。我正在尝试对 cassandra table 进行简单的写入和读取任务。如果我将 属性 名称和列名称全部保留为小写或蛇形 (unserscores),我可以实现此目的,但我想在我的 scala 代码中使用驼峰式大小写。有没有更好的方法在 Scala 中使用驼峰大小写格式,在 cassandra 中使用蛇形大小写格式。
我们正在使用
scala - 2.10.5 spark - 1.6.2 datastax spark-cassandra-connector -
1.6.0 cassandra - 3.0.9.1346 datastax enterprise - 5.0.3
卡桑德拉table
CREATE TABLE dev.castable (
id int PRIMARY KEY,
long_name text,
name text,
short_name text)
Scala 代码
val conf = new SparkConf()
.setAppName("TestHelper")
.setMaster("local")
.set("spark.cassandra.connection.host","127.0.01")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
println("writing data to cassandra")
val df = sqlContext.createDataFrame(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
df.write //*** this is not working
.cassandraFormat("castable", "dev")
.mode(SaveMode.Append)
.save()
println("reading data from cassandra") //*** This is working fine
val rdd = sc.cassandraTable[MyRow]("dev", "castable")
rdd.foreach(println)
异常
Exception in thread "main" java.util.NoSuchElementException: Columns not found in table dev.castable: longName, shortName
at com.datastax.spark.connector.SomeColumns.selectFrom(ColumnSelector.scala:38)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:268)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:67)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:85)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at com.aktana.spark.util.LocalTestDriver$.main(LocalTestDriver.scala:38)
我读到 spark-cassandra-connector 会自动自动执行此转换,但它对我不起作用。
datastax spark-cassandra-connector
您的 MyRow 定义似乎与 cassandra table 定义不匹配。试试这个:
val df = List((1, "My Long Description", "My Name", "My Short Name")).toDF("id", "long_name", "name", "short_name")
spark-cassandra-connector 使用 RDD 自动将驼峰式属性转换为带下划线的列名。再次感谢RussS
下面是我如何将 case class 对象保存到 cassandra table
val writeRDD = sc.makeRDD(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
writeRDD.saveToCassandra("dev", "castable")
我们最近开始使用 Scala、Spark 和 Cassandra 进行大数据项目,我对所有这些技术都是新手。我正在尝试对 cassandra table 进行简单的写入和读取任务。如果我将 属性 名称和列名称全部保留为小写或蛇形 (unserscores),我可以实现此目的,但我想在我的 scala 代码中使用驼峰式大小写。有没有更好的方法在 Scala 中使用驼峰大小写格式,在 cassandra 中使用蛇形大小写格式。
我们正在使用
scala - 2.10.5 spark - 1.6.2 datastax spark-cassandra-connector - 1.6.0 cassandra - 3.0.9.1346 datastax enterprise - 5.0.3
卡桑德拉table
CREATE TABLE dev.castable (
id int PRIMARY KEY,
long_name text,
name text,
short_name text)
Scala 代码
val conf = new SparkConf()
.setAppName("TestHelper")
.setMaster("local")
.set("spark.cassandra.connection.host","127.0.01")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
println("writing data to cassandra")
val df = sqlContext.createDataFrame(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
df.write //*** this is not working
.cassandraFormat("castable", "dev")
.mode(SaveMode.Append)
.save()
println("reading data from cassandra") //*** This is working fine
val rdd = sc.cassandraTable[MyRow]("dev", "castable")
rdd.foreach(println)
异常
Exception in thread "main" java.util.NoSuchElementException: Columns not found in table dev.castable: longName, shortName
at com.datastax.spark.connector.SomeColumns.selectFrom(ColumnSelector.scala:38)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:268)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:67)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:85)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at com.aktana.spark.util.LocalTestDriver$.main(LocalTestDriver.scala:38)
我读到 spark-cassandra-connector 会自动自动执行此转换,但它对我不起作用。 datastax spark-cassandra-connector
您的 MyRow 定义似乎与 cassandra table 定义不匹配。试试这个:
val df = List((1, "My Long Description", "My Name", "My Short Name")).toDF("id", "long_name", "name", "short_name")
spark-cassandra-connector 使用 RDD 自动将驼峰式属性转换为带下划线的列名。再次感谢RussS
下面是我如何将 case class 对象保存到 cassandra table
val writeRDD = sc.makeRDD(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
writeRDD.saveToCassandra("dev", "castable")