将 Cassandra 行映射到 Spark RDD 中的参数化类型
Mapping cassandra row to parametrized type in Spark RDD
我正在尝试使用 spark-cassandra-connector
将 cassandra 行映射到参数化类型。我一直在尝试使用隐式定义的 columnMapper 来定义映射,因此:
class Foo[T<:Bar:ClassTag:RowReaderFactory] {
implicit object Mapper extends JavaBeanColumnMapper[T](
Map("id" -> "id",
"timestamp" -> "ts"))
def doSomeStuff(operations: CassandraTableScanRDD[T]): Unit = {
println("do some stuff here")
}
}
但是,我 运行 遇到了以下错误,我认为这是因为我传入了 RowReaderFactory
而没有正确指定 [=13] 的映射=].知道如何为 RowReaderFactory
指定映射信息吗?
Exception in thread "main" java.lang.IllegalArgumentException: Failed to map constructor parameter timestamp in Bar to a column of MyNamespace
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$$anonfun$apply.apply(DefaultColumnMapper.scala:78)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$$anonfun$apply.apply(DefaultColumnMapper.scala:78)
at scala.Option.getOrElse(Option.scala:120)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun.apply(DefaultColumnMapper.scala:78)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun.apply(DefaultColumnMapper.scala:76)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForReading(DefaultColumnMapper.scala:76)
at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:56)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:23)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:48)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:43)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:147)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
事实证明,columnMapper
必须在创建 Foo
实例的范围内创建,而不是在 Foo
本身。
可以在Foo的伴生对象中隐式定义,如下:
object Foo {
implicit object Mapper extends JavaBeanColumnMapper[T](
Map("id" -> "id",
"timestamp" -> "ts"))
}
Scala 将在 class 试图找到 class 的隐式实例时查看 class 的伴生对象。如果你愿意,你可以在需要隐式的范围内定义它,但你可能想添加伴随对象,这样你就不需要在需要时重复它。
我正在尝试使用 spark-cassandra-connector
将 cassandra 行映射到参数化类型。我一直在尝试使用隐式定义的 columnMapper 来定义映射,因此:
class Foo[T<:Bar:ClassTag:RowReaderFactory] {
implicit object Mapper extends JavaBeanColumnMapper[T](
Map("id" -> "id",
"timestamp" -> "ts"))
def doSomeStuff(operations: CassandraTableScanRDD[T]): Unit = {
println("do some stuff here")
}
}
但是,我 运行 遇到了以下错误,我认为这是因为我传入了 RowReaderFactory
而没有正确指定 [=13] 的映射=].知道如何为 RowReaderFactory
指定映射信息吗?
Exception in thread "main" java.lang.IllegalArgumentException: Failed to map constructor parameter timestamp in Bar to a column of MyNamespace
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$$anonfun$apply.apply(DefaultColumnMapper.scala:78)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$$anonfun$apply.apply(DefaultColumnMapper.scala:78)
at scala.Option.getOrElse(Option.scala:120)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun.apply(DefaultColumnMapper.scala:78)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun.apply(DefaultColumnMapper.scala:76)
at scala.collection.TraversableLike$WithFilter$$anonfun$map.apply(TraversableLike.scala:722)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForReading(DefaultColumnMapper.scala:76)
at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:56)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:23)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:48)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:43)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:147)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
事实证明,columnMapper
必须在创建 Foo
实例的范围内创建,而不是在 Foo
本身。
可以在Foo的伴生对象中隐式定义,如下:
object Foo {
implicit object Mapper extends JavaBeanColumnMapper[T](
Map("id" -> "id",
"timestamp" -> "ts"))
}
Scala 将在 class 试图找到 class 的隐式实例时查看 class 的伴生对象。如果你愿意,你可以在需要隐式的范围内定义它,但你可能想添加伴随对象,这样你就不需要在需要时重复它。