近似百分位数的计算
computation of approximative percentiles
当使用 spark percentiles_approx
函数计算声明式 SQL 中的近似百分位数时,有时分组我观察到这个函数非常慢。我已经将精度降低到 100(需要大约 5 分钟的聚合时间)或有时是 1000(20-30 分钟)。这比默认的 10k 精度低 10 倍。
我观察到生成的百分位数有点匹配,但当真正进入细节并为许多组计算它时,即每天一个,它们根本不匹配。
事实上,当尽可能多地预聚合数据并保留所有数字列(即删除任何非数字内存密集型列)时,可以使用简单的 pandas 中位数,即 1) 精确和 2 ) 比 Spark 快。
我是不是选择的精度太低了?但是 1000 已经需要很长时间来计算(我有 >> 1 个聚合)所以 5 分钟和 25 分钟会快速相乘。
Pandas怎么会这么快呢?由于矢量化?
关于 speed/accuracy 权衡,这里有哪些合适的参数?
会不会是 t-digest https://github.com/tdunning/t-digest
只要每个键的状态足够小,我将应用以下代码使用 UDF 计算百分位数。
由于这需要更新版本的 breeze(这可能会使事情复杂化或产生一些副作用,因此我将 copy/paste breeze 的某些部分)。
val r = scala.util.Random
val numbers = for (i <- 0 to 20) yield (r.nextDouble)
// in reality spark sort_array(collect_list()) will ensure already pre-sorted condition for the array
val sortedNumbers = numbers.sorted
/**
* Returns the estimate of a pre-sorted array at p * it.size, where p in [0,1].
* <p>
* Note:
* Result is invalid if the input array is not already sorted.
* </p>
*/
def percentileInPlace(arr: Array[Double], p: Double) = {
if (p > 1 || p < 0) throw new IllegalArgumentException("p must be in [0,1]")
// +1 so that the .5 == mean for even number of elements.
val f = (arr.length + 1) * p
val i = f.toInt
if (i == 0) arr.head
else if (i >= arr.length) arr.last
else {
arr(i - 1) + (f - i) * (arr(i) - arr(i - 1))
}
}
percentileInPlace(sortedNumbers.toArray, 0.4)
percentileInPlace(sortedNumbers.toArray, 0.5)
percentileInPlace(sortedNumbers.toArray, 0.6)
这可以很容易地计算 UDF 中的各种百分位数,如果需要,return 可以计算多个百分位数的数组。
注意:当您计划 return > 1 来自 UDF 的值以节省时间时,请使用 .asNondeterministic()
。否则,当输出多列(=struct 字段)时,spark 将计算 collect_list/sort_array 和每个百分位数(可能)由于催化剂优化)。
当使用 spark percentiles_approx
函数计算声明式 SQL 中的近似百分位数时,有时分组我观察到这个函数非常慢。我已经将精度降低到 100(需要大约 5 分钟的聚合时间)或有时是 1000(20-30 分钟)。这比默认的 10k 精度低 10 倍。
我观察到生成的百分位数有点匹配,但当真正进入细节并为许多组计算它时,即每天一个,它们根本不匹配。 事实上,当尽可能多地预聚合数据并保留所有数字列(即删除任何非数字内存密集型列)时,可以使用简单的 pandas 中位数,即 1) 精确和 2 ) 比 Spark 快。
我是不是选择的精度太低了?但是 1000 已经需要很长时间来计算(我有 >> 1 个聚合)所以 5 分钟和 25 分钟会快速相乘。
Pandas怎么会这么快呢?由于矢量化?
关于 speed/accuracy 权衡,这里有哪些合适的参数?
会不会是 t-digest https://github.com/tdunning/t-digest
只要每个键的状态足够小,我将应用以下代码使用 UDF 计算百分位数。 由于这需要更新版本的 breeze(这可能会使事情复杂化或产生一些副作用,因此我将 copy/paste breeze 的某些部分)。
val r = scala.util.Random
val numbers = for (i <- 0 to 20) yield (r.nextDouble)
// in reality spark sort_array(collect_list()) will ensure already pre-sorted condition for the array
val sortedNumbers = numbers.sorted
/** * Returns the estimate of a pre-sorted array at p * it.size, where p in [0,1]. * <p> * Note: * Result is invalid if the input array is not already sorted. * </p> */ def percentileInPlace(arr: Array[Double], p: Double) = { if (p > 1 || p < 0) throw new IllegalArgumentException("p must be in [0,1]") // +1 so that the .5 == mean for even number of elements. val f = (arr.length + 1) * p val i = f.toInt if (i == 0) arr.head else if (i >= arr.length) arr.last else { arr(i - 1) + (f - i) * (arr(i) - arr(i - 1)) } }
percentileInPlace(sortedNumbers.toArray, 0.4)
percentileInPlace(sortedNumbers.toArray, 0.5)
percentileInPlace(sortedNumbers.toArray, 0.6)
这可以很容易地计算 UDF 中的各种百分位数,如果需要,return 可以计算多个百分位数的数组。
注意:当您计划 return > 1 来自 UDF 的值以节省时间时,请使用 .asNondeterministic()
。否则,当输出多列(=struct 字段)时,spark 将计算 collect_list/sort_array 和每个百分位数(可能)由于催化剂优化)。