在 Scala 中迭代 RDD Iterable
Iterating over an RDD Iterable in Scala
所以我是 Scala 的新手,刚开始使用 RDD 和函数式 Scala 操作。
我正在尝试通过应用已定义的 average
来迭代我的 Pair RDD 和 return Var1
的值以及存储在 Var2
中的值的平均值函数,以便最终的 return 是 Var1 的唯一列表,每个列表都有一个 AvgVar2
。我在弄清楚如何迭代这些值时遇到了很多麻烦。
*编辑:我有以下类型声明:
case class ID: Int, Var1: Int, Var2: Int extends Serializable
我有以下功能:
def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {
def average(as: Array[Var2]): AvgVar2 = {
var sum = 0.0
var i = 0.0
while (i < as.length) {
sum += Var2.val
i += 1
}
sum/i
}
//My attempt at Scala
rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}
我对 Scala 的尝试是尝试执行以下操作:
- 将RDD对Iterable拆分为
Var1-Var2
. 的键值对
- 按
Var1
的键分组并创建关联的数组 Var2
.
- 将我的
average
函数应用于 Var2
的每个数组
- Return
AvgVar2
与关联的 Var1
作为 RDDs 的集合
*编辑:
rdds
的一些示例输入数据:
//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]
一些示例输出数据:
//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]
*编辑:工作 scala 代码行:
rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))
考虑到ID
= Var1
,一个简单的.map()
就可以解决:
def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
def average(as: Iterable[(Int, Int)]): Double = {
as.map(_._2).reduce(_+_)/as.size.toDouble
}
rdds.map(x => (x._1, average(x._2)))
}
输出:
val input = sc.parallelize(List((1,Iterable((1,3),(1,12),(1,6))), (2, Iterable((2,5),(2,7)))))
scala> foo(input).collect
res0: Array[(Int, Double)] = Array((1,7.0), (2,6.0))
已编辑:(average()
具有相同的签名):
def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
def average(as: Array[Int]): Double = {
as.reduce(_+_)/as.size.toDouble
}
rdds.map(x => (x._1, average(x._2.map(tuple => tuple._2).toArray)))
}
所以我是 Scala 的新手,刚开始使用 RDD 和函数式 Scala 操作。
我正在尝试通过应用已定义的 average
来迭代我的 Pair RDD 和 return Var1
的值以及存储在 Var2
中的值的平均值函数,以便最终的 return 是 Var1 的唯一列表,每个列表都有一个 AvgVar2
。我在弄清楚如何迭代这些值时遇到了很多麻烦。
*编辑:我有以下类型声明:
case class ID: Int, Var1: Int, Var2: Int extends Serializable
我有以下功能:
def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {
def average(as: Array[Var2]): AvgVar2 = {
var sum = 0.0
var i = 0.0
while (i < as.length) {
sum += Var2.val
i += 1
}
sum/i
}
//My attempt at Scala
rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}
我对 Scala 的尝试是尝试执行以下操作:
- 将RDD对Iterable拆分为
Var1-Var2
. 的键值对
- 按
Var1
的键分组并创建关联的数组Var2
. - 将我的
average
函数应用于Var2
的每个数组
- Return
AvgVar2
与关联的Var1
作为 RDDs 的集合
*编辑:
rdds
的一些示例输入数据:
//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]
一些示例输出数据:
//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]
*编辑:工作 scala 代码行:
rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))
考虑到ID
= Var1
,一个简单的.map()
就可以解决:
def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
def average(as: Iterable[(Int, Int)]): Double = {
as.map(_._2).reduce(_+_)/as.size.toDouble
}
rdds.map(x => (x._1, average(x._2)))
}
输出:
val input = sc.parallelize(List((1,Iterable((1,3),(1,12),(1,6))), (2, Iterable((2,5),(2,7)))))
scala> foo(input).collect
res0: Array[(Int, Double)] = Array((1,7.0), (2,6.0))
已编辑:(average()
具有相同的签名):
def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
def average(as: Array[Int]): Double = {
as.reduce(_+_)/as.size.toDouble
}
rdds.map(x => (x._1, average(x._2.map(tuple => tuple._2).toArray)))
}