在每个组的 Spark-Scala 中查找百分位数

Finding Percentile in Spark-Scala per a group

我正在尝试使用如下所示的 Window 函数对列进行百分位数计算。我已经提到 在一个组上使用 ApproxQuantile 定义。

val df1 = Seq(
    (1, 10.0), (1, 20.0), (1, 40.6), (1, 15.6), (1, 17.6), (1, 25.6),
    (1, 39.6), (2, 20.5), (2 ,70.3), (2, 69.4), (2, 74.4), (2, 45.4),
    (3, 60.6), (3, 80.6), (4, 30.6), (4, 90.6)
).toDF("ID","Count")

val idBucketMapping = Seq((1, 4), (2, 3), (3, 2), (4, 2))
    .toDF("ID", "Bucket")

//jpp
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.expressions.Window

object PercentileApprox {
     def percentile_approx(col: Column, percentage: Column,
                             accuracy: Column): Column = {
         val expr = new ApproximatePercentile(
             col.expr, percentage.expr, accuracy.expr
         ).toAggregateExpression
         new Column(expr)
    }

    def percentile_approx(col: Column, percentage: Column): Column =
        percentile_approx(col, percentage,
                  lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
}
import PercentileApprox._

var res = df1
    .withColumn("percentile",
        percentile_approx(col("count"), typedLit(doBucketing(2)))
                 .over(Window.partitionBy("ID"))
    )

def doBucketing(bucket_size : Int) = (1 until bucket_size)
    .scanLeft(0d)((a, _) => a + (1 / bucket_size.toDouble))
scala> df1.show
+---+-----+
| ID|Count|
+---+-----+
|  1| 10.0|
|  1| 20.0|
|  1| 40.6|
|  1| 15.6|
|  1| 17.6|
|  1| 25.6|
|  1| 39.6|
|  2| 20.5|
|  2| 70.3|
|  2| 69.4|
|  2| 74.4|
|  2| 45.4|
|  3| 60.6|
|  3| 80.6|
|  4| 30.6|
|  4| 90.6|
+---+-----+


scala> idBucketMapping.show
+---+------+
| ID|Bucket|
+---+------+
|  1|     4|
|  2|     3|
|  3|     2|
|  4|     2|
+---+------+


scala> res.show
+---+-----+------------------+
| ID|Count|        percentile|
+---+-----+------------------+
|  1| 10.0|[10.0, 20.0, 40.6]|
|  1| 20.0|[10.0, 20.0, 40.6]|
|  1| 40.6|[10.0, 20.0, 40.6]|
|  1| 15.6|[10.0, 20.0, 40.6]|
|  1| 17.6|[10.0, 20.0, 40.6]|
|  1| 25.6|[10.0, 20.0, 40.6]|
|  1| 39.6|[10.0, 20.0, 40.6]|
|  3| 60.6|[60.6, 60.6, 80.6]|
|  3| 80.6|[60.6, 60.6, 80.6]|
|  4| 30.6|[30.6, 30.6, 90.6]|
|  4| 90.6|[30.6, 30.6, 90.6]|
|  2| 20.5|[20.5, 69.4, 74.4]|
|  2| 70.3|[20.5, 69.4, 74.4]|
|  2| 69.4|[20.5, 69.4, 74.4]|
|  2| 74.4|[20.5, 69.4, 74.4]|
|  2| 45.4|[20.5, 69.4, 74.4]|
+---+-----+------------------+

到这里一切都很好,逻辑很简单。但我需要动态的结果。这意味着此函数的参数 doBucketing(2) 应根据 ID - 值取自 idBucketMapping

这对我来说似乎有点棘手。这有可能吗?

预期输出 -- 这意味着百分位数桶基于 - idBucketMapping Dataframe .

+---+-----+------------------------+
|ID |Count|percentile              |
+---+-----+------------------------+
|1  |10.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |20.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |40.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |15.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |17.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |25.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |39.6 |[10.0, 15.6, 20.0, 39.6]|
|3  |60.6 |[60.6, 60.6]            |
|3  |80.6 |[60.6, 60.6]            |
|4  |30.6 |[30.6, 30.6]            |
|4  |90.6 |[30.6, 30.6]            |
|2  |20.5 |[20.5, 45.4, 70.3]      |
|2  |70.3 |[20.5, 45.4, 70.3]      |
|2  |69.4 |[20.5, 45.4, 70.3]      |
|2  |74.4 |[20.5, 45.4, 70.3]      |
|2  |45.4 |[20.5, 45.4, 70.3]      |
+---+-----+------------------------+

percentile_approx 取百分比和准确度。看来,它们都必须是常量文字。因此我们无法在运行时使用动态计算的 percentageaccuracy.

来计算 percentile_approx

ref- apache spark git percentile_approx source

我为您提供了一个非常不优雅的解决方案,并且仅当您的可能分桶数量有限时才有效。

我的第一个版本很丑

// for the sake of clarity, let's define a function that generates the
// window aggregation
def per(x : Int) = percentile_approx(col("count"), typedLit(doBucketing(x)))
                        .over(Window.partitionBy("ID"))

// then, we simply try to match the Bucket column with a possible value
val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", when('Bucket === 2, per(2)
                     .otherwise(when('Bucket === 3, per(3))
                     .otherwise(per(4)))
    )

这很讨厌,但它适用于你的情况。 稍微不那么丑但逻辑非常相同,您可以定义一组可能的桶数并使用它来执行与上述相同的操作。

val possible_number_of_buckets = 2 to 5

val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", possible_number_of_buckets
                .tail
                .foldLeft(per(possible_number_of_buckets.head))
                         ((column, size) => when('Bucket === size, per(size))
                                              .otherwise(column)))