近似百分位数的计算

computation of approximative percentiles

当使用 spark percentiles_approx 函数计算声明式 SQL 中的近似百分位数时,有时分组我观察到这个函数非常慢。我已经将精度降低到 100(需要大约 5 分钟的聚合时间)或有时是 1000(20-30 分钟)。这比默认的 10k 精度低 10 倍。

我观察到生成的百分位数有点匹配,但当真正进入细节并为许多组计算它时,即每天一个,它们根本不匹配。 事实上,当尽可能多地预聚合数据并保留所有数字列(即删除任何非数字内存密集型列)时,可以使用简单的 pandas 中位数,即 1) 精确和 2 ) 比 Spark 快。

我是不是选择的精度太低了?但是 1000 已经需要很长时间来计算(我有 >> 1 个聚合)所以 5 分钟和 25 分钟会快速相乘。

Pandas怎么会这么快呢?由于矢量化?

关于 speed/accuracy 权衡,这里有哪些合适的参数?

会不会是 t-digest https://github.com/tdunning/t-digest

只要每个键的状态足够小,我将应用以下代码使用 UDF 计算百分位数。 由于这需要更新版本的 breeze(这可能会使事情复杂化或产生一些副作用,因此我将 copy/paste breeze 的某些部分)。

val r = scala.util.Random
val numbers = for (i <- 0 to 20) yield (r.nextDouble)

// in reality spark sort_array(collect_list()) will ensure already pre-sorted condition for the array
val sortedNumbers = numbers.sorted

//https://github.com/scalanlp/breeze/blob/master/math/src/main/scala/breeze/stats/DescriptiveStats.scala#L537

/**
 * Returns the estimate of a pre-sorted array at p * it.size, where p in [0,1].
 * <p>
 * Note:
 * Result is invalid if the input array is not already sorted.
 * </p>
 */
def percentileInPlace(arr: Array[Double], p: Double) = {
  if (p > 1 || p < 0) throw new IllegalArgumentException("p must be in [0,1]")
  // +1 so that the .5 == mean for even number of elements.
  val f = (arr.length + 1) * p
  val i = f.toInt
  if (i == 0) arr.head
  else if (i >= arr.length) arr.last
  else {
    arr(i - 1) + (f - i) * (arr(i) - arr(i - 1))
  }
}
percentileInPlace(sortedNumbers.toArray, 0.4)
percentileInPlace(sortedNumbers.toArray, 0.5)
percentileInPlace(sortedNumbers.toArray, 0.6)

这可以很容易地计算 UDF 中的各种百分位数,如果需要,return 可以计算多个百分位数的数组。

注意:当您计划 return > 1 来自 UDF 的值以节省时间时,请使用 .asNondeterministic()。否则,当输出多列(=struct 字段)时,spark 将计算 collect_list/sort_array 和每个百分位数(可能)由于催化剂优化)。