Datastax spark cassandra 连接器 - 将 DF 写入 cassandra table

Datastax spark cassandra connector - writing DF to cassandra table

我们最近开始使用 Scala、Spark 和 Cassandra 进行大数据项目,我对所有这些技术都是新手。我正在尝试对 cassandra table 进行简单的写入和读取任务。如果我将 属性 名称和列名称全部保留为小写或蛇形 (unserscores),我可以实现此目的,但我想在我的 scala 代码中使用驼峰式大小写。有没有更好的方法在 Scala 中使用驼峰大小写格式,在 cassandra 中使用蛇形大小写格式。

我们正在使用

scala - 2.10.5 spark - 1.6.2 datastax spark-cassandra-connector - 1.6.0 cassandra - 3.0.9.1346 datastax enterprise - 5.0.3

卡桑德拉table

CREATE TABLE dev.castable (
id int PRIMARY KEY,
long_name text,
name text,
short_name text)

Scala 代码

    val conf = new SparkConf()
        .setAppName("TestHelper")
        .setMaster("local")
        .set("spark.cassandra.connection.host","127.0.01")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    println("writing data to cassandra")
    val df = sqlContext.createDataFrame(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
    df.write //*** this is not working
      .cassandraFormat("castable", "dev")
      .mode(SaveMode.Append)
      .save()

    println("reading data from cassandra") //*** This is working fine
    val rdd = sc.cassandraTable[MyRow]("dev", "castable")
    rdd.foreach(println)

异常

Exception in thread "main" java.util.NoSuchElementException: Columns not found in table dev.castable: longName, shortName
at com.datastax.spark.connector.SomeColumns.selectFrom(ColumnSelector.scala:38)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:268)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:67)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:85)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at com.aktana.spark.util.LocalTestDriver$.main(LocalTestDriver.scala:38)

我读到 spark-cassandra-connector 会自动自动执行此转换,但它对我不起作用。 datastax spark-cassandra-connector

您的 MyRow 定义似乎与 cassandra table 定义不匹配。试试这个:

val df = List((1, "My Long Description", "My Name", "My Short Name")).toDF("id", "long_name", "name", "short_name")

spark-cassandra-connector 使用 RDD 自动将驼峰式属性转换为带下划线的列名。再次感谢RussS

下面是我如何将 case class 对象保存到 cassandra table

    val writeRDD = sc.makeRDD(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
    writeRDD.saveToCassandra("dev", "castable")