使用 flatMapGroups 时的类型问题
Type issues when using flatMapGroups
我正在将 spark-1.6 rdd 转换为 spark-2.x 数据集
原代码为:
val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data.rdd
.groupBy(x => x._1)
.map(x => {
val (id: Int, points: Iterable[(Int, Array[Double])]) = x
val data1 = points.map(x => x._2).toArray
data1
}).collect()
sample_data.rdd
不再有效,因此我尝试使用数据集执行相同的操作。新方法使用 flatMapGroups
val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data
.groupByKey(x => x._1)
.flatMapGroups ( (id: Int, points: Iterable[(Int, Array[Double])]) =>
Iterator(points.map((x:Int, y:Array[Double]) => y)).toList
).collect()
给出的错误是:
Error:(36, 25) overloaded method value map with alternatives: [B,
That](f: ((Int, Array[Double])) => B)(implicit bf:
scala.collection.generic.CanBuildFrom[Iterable[(Int,
Array[Double])],B,That])That [B](f: ((Int, Array[Double])) =>
B)Iterator[B] cannot be applied to ((Int, Array[Double]) =>
Array[Double])
Iterator(points.map((x:Int, y:Array[Double])
=> y)).toList
能否请您举例说明如何使用 flatMapGroups 以及如何理解给定的错误?
points
实际上是一个 Iterator
,但您将其转换为 Iterable
,因此编译器告诉您将其设为 Iterator
。
这就是您要执行的操作:
val samples: Array[Array[Array[Double]]] = sample_data
.groupByKey(_._1)
.flatMapGroups((id: Int, points: Iterator[(Int, Array[Double])]) =>
Iterator(points.map(_._2).toArray)
).collect()
在迭代器中重新包装对您没有任何意义,因此您可以像这样使用 mapGroups:
.mapGroups((_, points) => points.map(_._2).toArray)
然而,在这两种情况下,Array[Array[_]] 都没有编码器。查看 了解更多详情。
所以要么自己实现隐式编码器 (existing Encoders),要么坚持使用 RDD
接口。
我正在将 spark-1.6 rdd 转换为 spark-2.x 数据集
原代码为:
val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data.rdd
.groupBy(x => x._1)
.map(x => {
val (id: Int, points: Iterable[(Int, Array[Double])]) = x
val data1 = points.map(x => x._2).toArray
data1
}).collect()
sample_data.rdd
不再有效,因此我尝试使用数据集执行相同的操作。新方法使用 flatMapGroups
val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data
.groupByKey(x => x._1)
.flatMapGroups ( (id: Int, points: Iterable[(Int, Array[Double])]) =>
Iterator(points.map((x:Int, y:Array[Double]) => y)).toList
).collect()
给出的错误是:
Error:(36, 25) overloaded method value map with alternatives: [B, That](f: ((Int, Array[Double])) => B)(implicit bf: scala.collection.generic.CanBuildFrom[Iterable[(Int, Array[Double])],B,That])That [B](f: ((Int, Array[Double])) => B)Iterator[B] cannot be applied to ((Int, Array[Double]) => Array[Double]) Iterator(points.map((x:Int, y:Array[Double]) => y)).toList
能否请您举例说明如何使用 flatMapGroups 以及如何理解给定的错误?
points
实际上是一个 Iterator
,但您将其转换为 Iterable
,因此编译器告诉您将其设为 Iterator
。
这就是您要执行的操作:
val samples: Array[Array[Array[Double]]] = sample_data
.groupByKey(_._1)
.flatMapGroups((id: Int, points: Iterator[(Int, Array[Double])]) =>
Iterator(points.map(_._2).toArray)
).collect()
在迭代器中重新包装对您没有任何意义,因此您可以像这样使用 mapGroups:
.mapGroups((_, points) => points.map(_._2).toArray)
然而,在这两种情况下,Array[Array[_]] 都没有编码器。查看
所以要么自己实现隐式编码器 (existing Encoders),要么坚持使用 RDD
接口。