参数化 class 的 Scala 包装方法(spark-cassandra-connector)
Scala wrapping method of a parametrized class (spark-cassandra-connector)
我正在编写一组扩展 Spark RDD 的方法 API。
我必须实现一个存储 RDD 的通用方法,首先我尝试包装 spark-cassandra-connector 的 saveAsCassandraTable
,但没有成功。
这是 "extending RDD's API" 部分:
object NewRDDFunctions {
implicit def addStorageFunctions[T](rdd: RDD[T]):
RDDStorageFunctions[T] = new RDDStorageFunctions(rdd)
}
class RDDStorageFunctions[T](rdd: RDD[T]) {
def saveResultsToCassandra() {
rdd.saveAsCassandraTable("ks_name", "table_name") // this line produces errors!
}
}
...并将对象导入为:import ...NewRDDFunctions._
.
标记的行产生以下错误:
Error:(99, 29) could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[T]
rdd.saveAsCassandraTable("ks_name", "table_name")
^
Error:(99, 29) not enough arguments for method saveAsCassandraTable: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[T], implicit columnMapper: com.datastax.spark.connector.mapper.ColumnMapper[T])Unit.
Unspecified value parameters rwf, columnMapper.
rdd.saveAsCassandraTable("ks_name", "table_name")
^
我不明白为什么这不起作用,因为 saveAsCassandraTable
设计用于任何 RDD。有什么建议吗?
我在 spark-cassandra-connector 文档中遇到了与 example 类似的问题:
case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))
...解决方案是将 case class 定义移出 "main" 函数(但我真的不知道这是否适用于上述问题...)。
saveAsCassandraTable
需要 3 个隐式参数。第一个 (connector
) 有一个默认值,最后两个 (rwf
和 columnMapper
) 不在你的 saveResultsToCassandra
方法的隐式范围内,因此你的方法不编译。
如果您需要有关隐式的更多信息,请查看另一个问题 answer。
如果您之前已经定义了表 (TableDef
),那么将您的 saveResultsToCassandra
转换为下面的函数应该可行。
def saveResultsToCassandra()(
// implicit parameters as a separate list!
implicit rwf: RowWriterFactory[T],
columnMapper: ColumnMapper[T]
) {
rdd.saveAsCassandraTable("ks_name", "table_name")
}
我正在编写一组扩展 Spark RDD 的方法 API。
我必须实现一个存储 RDD 的通用方法,首先我尝试包装 spark-cassandra-connector 的 saveAsCassandraTable
,但没有成功。
这是 "extending RDD's API" 部分:
object NewRDDFunctions {
implicit def addStorageFunctions[T](rdd: RDD[T]):
RDDStorageFunctions[T] = new RDDStorageFunctions(rdd)
}
class RDDStorageFunctions[T](rdd: RDD[T]) {
def saveResultsToCassandra() {
rdd.saveAsCassandraTable("ks_name", "table_name") // this line produces errors!
}
}
...并将对象导入为:import ...NewRDDFunctions._
.
标记的行产生以下错误:
Error:(99, 29) could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[T]
rdd.saveAsCassandraTable("ks_name", "table_name")
^
Error:(99, 29) not enough arguments for method saveAsCassandraTable: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[T], implicit columnMapper: com.datastax.spark.connector.mapper.ColumnMapper[T])Unit.
Unspecified value parameters rwf, columnMapper.
rdd.saveAsCassandraTable("ks_name", "table_name")
^
我不明白为什么这不起作用,因为 saveAsCassandraTable
设计用于任何 RDD。有什么建议吗?
我在 spark-cassandra-connector 文档中遇到了与 example 类似的问题:
case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))
...解决方案是将 case class 定义移出 "main" 函数(但我真的不知道这是否适用于上述问题...)。
saveAsCassandraTable
需要 3 个隐式参数。第一个 (connector
) 有一个默认值,最后两个 (rwf
和 columnMapper
) 不在你的 saveResultsToCassandra
方法的隐式范围内,因此你的方法不编译。
如果您需要有关隐式的更多信息,请查看另一个问题 answer。
如果您之前已经定义了表 (TableDef
),那么将您的 saveResultsToCassandra
转换为下面的函数应该可行。
def saveResultsToCassandra()(
// implicit parameters as a separate list!
implicit rwf: RowWriterFactory[T],
columnMapper: ColumnMapper[T]
) {
rdd.saveAsCassandraTable("ks_name", "table_name")
}