如何在 Scala Spark 作业中对多个键使用 ReduceByKey

How to use ReduceByKey on multiple key in a Scala Spark Job

我对 spark 比较陌生,我正在尝试同时按多个键对数据进行分组。

我有一些映射的数据,所以它最终看起来像这样:

((K1,K2,K3),(V1,V2))

我的目标是按 (K1,K2,K3) 分组并分别对 V1 和 V2 求和以得到:

((K1,K2,K3), (SUM(V1),SUM(V2))

这是我目前的代码:

val filepath  = "file.avro"
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)            
val data = sqlContext.read.avro(filepath)
val dataRDD = data.rdd

val mappedDataRDD = dataRDD.map{
   case (v, w, x, y, z) => ((v,w,x), (y, z))
}.reduceByKey((x,y)=> ???)

所以我正在寻找如何 reduceByKey 以便我可以按 (v,w,x) 键分组并对 y 和 z 求和。

我认为您正在寻找并且应该使用的是aggregateByKey

这个方法有两个参数组。第一个参数组取累加器的起始值。第二个参数组带两个函数,

  1. 一个将东西累加到累加器中的函数。
  2. 结合两个累加器的函数。

现在可以按如下方式使用了,

val (accZeroY, accZeroZ): (Long, Long) = (0, 0) 

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .aggregateByKey((accZeroY, accZeroZ))(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
    { case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) }
  )

正如您应该观察到的那样,在这种情况下,第二个参数组中的两个函数实际上是相同的。这仅在 type of the needed accumulationkey-value-RDDPairRDD.

中的值类型相同的情况下才有可能

在这种情况下,您还可以使用 reduceByKey,您可以将其视为一个 aggregateByKey,具有作为两个函数参数传递的相同函数,

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .reduceByKey(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
  )

但在我看来,您 should NOT 使用 reduceBykey。我建议使用 aggregateByKey 的原因是因为在大型数据集上累积值有时会产生超出您类型范围的结果。

例如在你的情况下,我怀疑你的 (x, y) 实际上是一个 (Int, Int) 并且你想使用 (v, w, x) 作为键来累积它。但是每当你大量添加 Int 时......请记住,结果最终可能会比 Int 可以处理的更大。

所以...你会希望你的积累类型是 (Int, Int)(Long, Long)reduceByKey 不允许你这样做的范围更大的类型。所以......我会说也许你正在寻找并且应该使用 aggregateByKey

你也可以用reduceByKey,只是要注意你想要的。我简化了示例,但它公开了您想要的内容。

val rdd = sc.parallelize(List(
  (1, 2, 1, 1, 1), 
  (1, 2, 1, 2, 2),   
  (1, 3, 2, 4, 4)))

rdd.map {
  case (k1, k2, k3, v1, v2) => ((k1, k2, k3), (v1, v2))
}.reduceByKey {
  // You receive two values which are actually tuples, so we treat them like that.
  case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)
}.collect()
//res0: Array[((Int, Int), (Int, Int))] = Array(((1,2,1),(3,3)), ((1,3,2),(4,4)))