聚合每个键的 RDD 值
Aggregate RDD values per key
我在键值结构中有 RDD (someKey,(measure1,measure2))。我按键分组,现在我想聚合每个键的值。
val RDD1 : RDD[(String,(Int,Int))]
RDD1.groupByKey()
我需要的结果是:
key: avg(measure1), avg(measure2), max(measure1), max(measure2), min(measure1), min(measure2), count(*)
首先,avoid groupByKey! You should use aggregateByKey
or combineByKey
. We will use aggregateByKey
。此函数将转换每个键的值:RDD[(K, V)] => RDD[(K, U)]
。它需要类型 U
的零值以及如何合并 (V, U) => U
和 (U, U) => U
的知识。我稍微简化了您的示例并希望获得:key: avg(measure1), avg(measure2), min(measure1), min(measure2), count(*)
val rdd1 = sc.parallelize(List(("a", (11, 1)), ("a",(12, 3)), ("b",(10, 1))))
rdd1
.aggregateByKey((0.0, 0.0, Int.MaxValue, Int.MaxValue, 0))(
{
case ((sum1, sum2, min1, min2, count1), (v1, v2)) =>
(sum1 + v1, sum2 + v2, v1 min min1, v2 min min2, count1+1)
},
{
case ((sum1, sum2, min1, min2, count),
(otherSum1, otherSum2, otherMin1, otherMin2, otherCount)) =>
(sum1 + otherSum1, sum2 + otherSum2,
min1 min otherMin1, min2 min otherMin2, count + otherCount)
}
)
.map {
case (k, (sum1, sum2, min1, min2, count1)) => (k, (sum1/count1, sum2/count1, min1, min2, count1))
}
.collect()
给予
(a,(11.5,2.0,11,1,2)), (b,(10.0,1.0,10,1,1))
我在键值结构中有 RDD (someKey,(measure1,measure2))。我按键分组,现在我想聚合每个键的值。
val RDD1 : RDD[(String,(Int,Int))]
RDD1.groupByKey()
我需要的结果是:
key: avg(measure1), avg(measure2), max(measure1), max(measure2), min(measure1), min(measure2), count(*)
首先,avoid groupByKey! You should use aggregateByKey
or combineByKey
. We will use aggregateByKey
。此函数将转换每个键的值:RDD[(K, V)] => RDD[(K, U)]
。它需要类型 U
的零值以及如何合并 (V, U) => U
和 (U, U) => U
的知识。我稍微简化了您的示例并希望获得:key: avg(measure1), avg(measure2), min(measure1), min(measure2), count(*)
val rdd1 = sc.parallelize(List(("a", (11, 1)), ("a",(12, 3)), ("b",(10, 1))))
rdd1
.aggregateByKey((0.0, 0.0, Int.MaxValue, Int.MaxValue, 0))(
{
case ((sum1, sum2, min1, min2, count1), (v1, v2)) =>
(sum1 + v1, sum2 + v2, v1 min min1, v2 min min2, count1+1)
},
{
case ((sum1, sum2, min1, min2, count),
(otherSum1, otherSum2, otherMin1, otherMin2, otherCount)) =>
(sum1 + otherSum1, sum2 + otherSum2,
min1 min otherMin1, min2 min otherMin2, count + otherCount)
}
)
.map {
case (k, (sum1, sum2, min1, min2, count1)) => (k, (sum1/count1, sum2/count1, min1, min2, count1))
}
.collect()
给予
(a,(11.5,2.0,11,1,2)), (b,(10.0,1.0,10,1,1))