具有泛型类型的 Spark reduceByKey (Scala)

Spark reduceByKey with generic types (Scala)

我正在尝试使用 Scala 在 Spark 中创建一些简单的自定义聚合运算符。

我创建了一个简单的运算符层次结构,具有以下超级class:

sealed abstract class Aggregator(val name: String) {
  type Key = Row  // org.apache.spark.sql.Row
  type Value

  ...
}

我还有一个伴生对象,它每次都构造适当的聚合器。观察到每个运算符都可以指定它想要的值类型。

现在的问题是当我尝试调用 combineByKey:

val agg = Aggregator("SUM")
val res = rdd
    .map(agg.mapper)
    .reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))

错误是:

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]

根据我的需要,Value 可以是数字类型或元组,因此它没有边界定义。如果我将 Value 类型声明替换为:

type Value = Double

Aggregatorclass,然后一切正常。因此,我认为该错误与 reduceByKey 在编译时不知道确切的 Value 类型有关。

关于如何解决这个问题的任何想法?

您的 RDD 无法隐式转换为 PairRDDFunctions,因为缺少键和值的所有隐式 ClassTag

您可能希望在 Aggregator:

中包含 class 标记作为隐式参数
sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
  implicit val keyClassTag: ClassTag[K] = implicitly
  implicit val valueClassTag: ClassTag[V] = implicitly
}

或者也许:

sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
  implicit val keyClassTag: ClassTag[K] = kt
  implicit val valueClassTag: ClassTag[V] = vt
}

或者甚至:

sealed abstract class Aggregator(name: String) {
  type K
  type V
  implicit def keyClassTag: ClassTag[K]
  implicit def valueClassTag: ClassTag[V]
}

最后一个变体会将提供 ClassTag 的责任转移给抽象 class 的实现者。

现在,当在 reduceByKey 中使用 Aggregator[K, V] 类型的聚合器 a 时,您必须确保那些隐式提供的 class 标签在当前隐含范围:

val agg = Aggregator("SUM")
import agg._ // now the implicits should be visible
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))