具有泛型类型的 Spark reduceByKey (Scala)
Spark reduceByKey with generic types (Scala)
我正在尝试使用 Scala 在 Spark 中创建一些简单的自定义聚合运算符。
我创建了一个简单的运算符层次结构,具有以下超级class:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
我还有一个伴生对象,它每次都构造适当的聚合器。观察到每个运算符都可以指定它想要的值类型。
现在的问题是当我尝试调用 combineByKey
:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
错误是:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
根据我的需要,Value
可以是数字类型或元组,因此它没有边界定义。如果我将 Value
类型声明替换为:
type Value = Double
在Aggregator
class,然后一切正常。因此,我认为该错误与 reduceByKey
在编译时不知道确切的 Value
类型有关。
关于如何解决这个问题的任何想法?
您的 RDD
无法隐式转换为 PairRDDFunctions
,因为缺少键和值的所有隐式 ClassTag
。
您可能希望在 Aggregator
:
中包含 class 标记作为隐式参数
sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
implicit val keyClassTag: ClassTag[K] = implicitly
implicit val valueClassTag: ClassTag[V] = implicitly
}
或者也许:
sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
implicit val keyClassTag: ClassTag[K] = kt
implicit val valueClassTag: ClassTag[V] = vt
}
或者甚至:
sealed abstract class Aggregator(name: String) {
type K
type V
implicit def keyClassTag: ClassTag[K]
implicit def valueClassTag: ClassTag[V]
}
最后一个变体会将提供 ClassTag
的责任转移给抽象 class 的实现者。
现在,当在 reduceByKey
中使用 Aggregator[K, V]
类型的聚合器 a
时,您必须确保那些隐式提供的 class 标签在当前隐含范围:
val agg = Aggregator("SUM")
import agg._ // now the implicits should be visible
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
我正在尝试使用 Scala 在 Spark 中创建一些简单的自定义聚合运算符。
我创建了一个简单的运算符层次结构,具有以下超级class:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
我还有一个伴生对象,它每次都构造适当的聚合器。观察到每个运算符都可以指定它想要的值类型。
现在的问题是当我尝试调用 combineByKey
:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
错误是:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
根据我的需要,Value
可以是数字类型或元组,因此它没有边界定义。如果我将 Value
类型声明替换为:
type Value = Double
在Aggregator
class,然后一切正常。因此,我认为该错误与 reduceByKey
在编译时不知道确切的 Value
类型有关。
关于如何解决这个问题的任何想法?
您的 RDD
无法隐式转换为 PairRDDFunctions
,因为缺少键和值的所有隐式 ClassTag
。
您可能希望在 Aggregator
:
sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
implicit val keyClassTag: ClassTag[K] = implicitly
implicit val valueClassTag: ClassTag[V] = implicitly
}
或者也许:
sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
implicit val keyClassTag: ClassTag[K] = kt
implicit val valueClassTag: ClassTag[V] = vt
}
或者甚至:
sealed abstract class Aggregator(name: String) {
type K
type V
implicit def keyClassTag: ClassTag[K]
implicit def valueClassTag: ClassTag[V]
}
最后一个变体会将提供 ClassTag
的责任转移给抽象 class 的实现者。
现在,当在 reduceByKey
中使用 Aggregator[K, V]
类型的聚合器 a
时,您必须确保那些隐式提供的 class 标签在当前隐含范围:
val agg = Aggregator("SUM")
import agg._ // now the implicits should be visible
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))