Scala - 无法将 Scala 对象写入 Cassandra
Scala - Unable to write Scala object into Cassandra
我正在尝试使用 Spark 将 Scala case class 对象写入 Cassandra。但是我在 运行 代码时遇到异常。我想我无法将我的案例 class 对象映射到我的 Cassandra 行。我的 Scala 代码如下所示
CassandraPerformerClass.scala
object CassandraPerformerClass extends App
{
override def main(args: Array[String])
{
val keyspace = "scalakeys1"
val tablename = "demotable1"
val conf = new SparkConf().setAppName("CassandraDemo") .setMaster("spark://ct-0015:7077") .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
conf.set("spark.cassandra.connection.host", "192.168.50.103")
conf.set("spark.cassandra.connection.native.port", "9041")
conf.set("spark.cassandra.connection.rpc.port", "9160")
val sc = new SparkContext(conf);
CassandraConnector(conf).withSessionDo
{ session =>
session.execute("DROP KEYSPACE IF EXISTS "+keyspace+" ;");
session.execute("CREATE KEYSPACE "+ keyspace +" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
session.execute("CREATE TABLE "+keyspace+"."+tablename+" (keyval bigint, rangef bigint, arrayval text, PRIMARY KEY (rangef, keyval));");
session.execute("CREATE INDEX index_11 ON "+keyspace+"."+tablename+" (keyval) ;");
}
val data = Seq(new Data(1, 10, "string1"), new Data(2, 20, "string2"));
val collection = sc.parallelize(data)
collection.saveToCassandra(keyspace, tablename)
}
case class Data(kv : Long, rf : Long, av : String) extends Serializable
{
private var keyval : Long = kv
private var rangef : Long = rf
private var arrayval : String = av
def setKeyval (kv : Long)
{
keyval = kv
}
def setRangef (rf : Long)
{
rangef = rf
}
def setArrayval (av : String)
{
arrayval = av
}
def getKeyval = keyval
def getRangef = rangef
def getArrayval = arrayval
override def toString = keyval + "," + rangef + "," + arrayval
}
}
异常
Exception in thread "main" java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: rangef, keyval
at com.datastax.spark.connector.writer.DefaultRowWriter.checkMissingPrimaryKeyColumns(DefaultRowWriter.scala:44)
at com.datastax.spark.connector.writer.DefaultRowWriter.(DefaultRowWriter.scala:71)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:109)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:107)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:170)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:23)
at com.cleartrail.spark.scala.cassandra.poc.CassandraPerformerClass$.main(CassandraPerformerClass.scala:33)
at com.cleartrail.spark.scala.cassandra.poc.CassandraPerformerClass.main(CassandraPerformerClass.scala)
请告诉我如何将我的案例 class 对象映射到 Cassandra 行。
用于 Spark 的基于 Scala 的连接器不希望 java-bean 像 case class 那样带有字段的 getter。 (无论如何,这是一个不好的做法 - case classes 是 immutable 类 bean 数据容器的替代品,并且具有字段的默认访问器并且没有修改器)。
创建一个与 Cassandra table 具有相同名称和类型的 case class
就可以了:
case class Data(keyval: Long, rangef:Long , arrayval: String) extends Serializable
我正在尝试使用 Spark 将 Scala case class 对象写入 Cassandra。但是我在 运行 代码时遇到异常。我想我无法将我的案例 class 对象映射到我的 Cassandra 行。我的 Scala 代码如下所示
CassandraPerformerClass.scala
object CassandraPerformerClass extends App
{
override def main(args: Array[String])
{
val keyspace = "scalakeys1"
val tablename = "demotable1"
val conf = new SparkConf().setAppName("CassandraDemo") .setMaster("spark://ct-0015:7077") .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
conf.set("spark.cassandra.connection.host", "192.168.50.103")
conf.set("spark.cassandra.connection.native.port", "9041")
conf.set("spark.cassandra.connection.rpc.port", "9160")
val sc = new SparkContext(conf);
CassandraConnector(conf).withSessionDo
{ session =>
session.execute("DROP KEYSPACE IF EXISTS "+keyspace+" ;");
session.execute("CREATE KEYSPACE "+ keyspace +" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
session.execute("CREATE TABLE "+keyspace+"."+tablename+" (keyval bigint, rangef bigint, arrayval text, PRIMARY KEY (rangef, keyval));");
session.execute("CREATE INDEX index_11 ON "+keyspace+"."+tablename+" (keyval) ;");
}
val data = Seq(new Data(1, 10, "string1"), new Data(2, 20, "string2"));
val collection = sc.parallelize(data)
collection.saveToCassandra(keyspace, tablename)
}
case class Data(kv : Long, rf : Long, av : String) extends Serializable
{
private var keyval : Long = kv
private var rangef : Long = rf
private var arrayval : String = av
def setKeyval (kv : Long)
{
keyval = kv
}
def setRangef (rf : Long)
{
rangef = rf
}
def setArrayval (av : String)
{
arrayval = av
}
def getKeyval = keyval
def getRangef = rangef
def getArrayval = arrayval
override def toString = keyval + "," + rangef + "," + arrayval
}
}
异常
Exception in thread "main" java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: rangef, keyval at com.datastax.spark.connector.writer.DefaultRowWriter.checkMissingPrimaryKeyColumns(DefaultRowWriter.scala:44) at com.datastax.spark.connector.writer.DefaultRowWriter.(DefaultRowWriter.scala:71) at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:109) at com.datastax.spark.connector.writer.DefaultRowWriter$$anon.rowWriter(DefaultRowWriter.scala:107) at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:170) at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:23) at com.cleartrail.spark.scala.cassandra.poc.CassandraPerformerClass$.main(CassandraPerformerClass.scala:33) at com.cleartrail.spark.scala.cassandra.poc.CassandraPerformerClass.main(CassandraPerformerClass.scala)
请告诉我如何将我的案例 class 对象映射到 Cassandra 行。
用于 Spark 的基于 Scala 的连接器不希望 java-bean 像 case class 那样带有字段的 getter。 (无论如何,这是一个不好的做法 - case classes 是 immutable 类 bean 数据容器的替代品,并且具有字段的默认访问器并且没有修改器)。
创建一个与 Cassandra table 具有相同名称和类型的 case class
就可以了:
case class Data(keyval: Long, rangef:Long , arrayval: String) extends Serializable