如何找到 Spark RDD 的平均值?

How to find an average for a Spark RDD?

我读到 reduce 函数必须是可交换的和结合的。我应该如何编写一个函数来找到平均值以符合此要求?如果我应用以下函数来计算 RDD 的平均值,它将无法正确计算平均值。谁能解释我的功能有什么问题?

我猜它需要两个元素,比如 1、2,然后像 (1+2)/2 一样将函数应用于它们。然后将结果与下一个元素相加,3 除以 2 等

val rdd = sc.parallelize(1 to 100)

rdd.reduce((_ + _) / 2)

rdd.reduce((_ + _) / 2)

上述reduce平均计算方法存在一些问题:

  1. placeholder 语法不能作为 shorthand 用于 reduce((acc, x) => (acc + x) / 2)
  2. 由于您的 RDD 是整数类型,rdd.reduce((acc, x) => (acc + x) / 2) 将在每次迭代中产生 integer division(计算平均值当然不正确)
  3. reduce方法不会产生列表的平均值。例如:

    List[Double](1, 2, 3).reduce((a, x) => (a + x) / 2)
    --> (1.0 + 2.0) / 2 = 1.5
    --> (1.5 + 3.0) / 2 = 2.25
    Result: 2.25
    

    鉴于:

    Average of List[Double](1, 2, 3) = 2.0
    

How should I write a [reduce] function to find the average so it conforms with this requirement?

我不确定reduce是否适合直接计算列表的平均值。您当然可以使用 reduce(_ + _) 对列表求和,然后将总和除以其大小,例如:

rdd.reduce(_ + _) / rdd.count.toDouble

但是你可以简单地使用RDD的内置函数mean:

rdd.mean

您还可以使用 PairRDD 来跟踪所有元素的总和以及元素的计数。

val pair = sc.parallelize(1 to 100)
.map(x => (x, 1))
.reduce((x, y) => (x._1 + y._1, x._2 + y._2))

val mean = pair._1 / pair._2