使用 flatMapGroups 时的类型问题

Type issues when using flatMapGroups

我正在将 spark-1.6 rdd 转换为 spark-2.x 数据集

原代码为:

val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data.rdd
  .groupBy(x => x._1)
  .map(x => {
   val (id: Int, points: Iterable[(Int, Array[Double])]) = x
   val data1 = points.map(x => x._2).toArray
   data1
}).collect()

sample_data.rdd 不再有效,因此我尝试使用数据集执行相同的操作。新方法使用 flatMapGroups

val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data
  .groupByKey(x => x._1)
  .flatMapGroups ( (id: Int, points: Iterable[(Int, Array[Double])]) =>
    Iterator(points.map((x:Int, y:Array[Double]) => y)).toList
  ).collect()

给出的错误是:

Error:(36, 25) overloaded method value map with alternatives: [B, That](f: ((Int, Array[Double])) => B)(implicit bf: scala.collection.generic.CanBuildFrom[Iterable[(Int, Array[Double])],B,That])That [B](f: ((Int, Array[Double])) => B)Iterator[B] cannot be applied to ((Int, Array[Double]) => Array[Double]) Iterator(points.map((x:Int, y:Array[Double]) => y)).toList

能否请您举例说明如何使用 flatMapGroups 以及如何理解给定的错误?

points 实际上是一个 Iterator,但您将其转换为 Iterable,因此编译器告诉您将其设为 Iterator

这就是您要执行的操作:

val samples: Array[Array[Array[Double]]] = sample_data
    .groupByKey(_._1)
    .flatMapGroups((id: Int, points: Iterator[(Int, Array[Double])]) =>
        Iterator(points.map(_._2).toArray)
    ).collect()

在迭代器中重新包装对您没有任何意义,因此您可以像这样使用 mapGroups:

.mapGroups((_, points) => points.map(_._2).toArray)

然而,在这两种情况下,Array[Array[_]] 都没有编码器。查看 了解更多详情。

所以要么自己实现隐式编码器 (existing Encoders),要么坚持使用 RDD 接口。