如何使用 partitionedBy 计算按列分组的百分位数?
How to calculate percentiles grouped by column using partitionedBy?
我正在使用 spark-sql-2.4.1v,我正在尝试查找分位数,即
我给定数据的每一列上的百分位数 0、百分位数 25 等。
当我做多个百分位数时,如何检索每个计算的
结果的百分位数?
我的数据框df
:
+----+---------+-------------+----------+-----------+-------+
| id| date| revenue|con_dist_1| con_dist_2| zone |
+----+---------+-------------+----------+-----------+-------+
| 10|1/15/2018| 0.010680705| 10|0.019875458| east |
| 10|1/15/2018| 0.006628853| 4|0.816039063| west |
| 10|1/15/2018| 0.01378215| 20|0.082049528| east |
| 10|1/15/2018| 0.010680705| 6|0.019875458| west |
| 10|1/15/2018| 0.006628853| 30|0.816039063| east |
+----+---------+-------------+----------+-----------+-------+
最终数据帧应该如下所示,即每个区域:
+---+---------+-----------+-------+-------------+-----------+-----------+
| id| date| revenue| zone | perctile_col| quantile_0|quantile_10|
+---+---------+-----------+-------+-------------+-----------+-----------+
| 10|1/15/2018|0.010680705| east | con_dist_1 | 10.0| 30.0|
| 10|1/15/2018|0.010680705| east | con_dist_2 |0.019875458|0.816039063|
| 10|1/15/2018|0.010680705| west | con_dist_1 | 4.0| 6.0|
| 10|1/15/2018|0.010680705| west | con_dist_2 |0.019875458|0.816039063|
+---+---------+-----------+-------+-------------+-----------+-----------+
有什么方法可以使用partitionBy
和approxQuantile
函数吗?
这是否会使用 repartition("zone")
进行处理,即不收集每个区域的数据集?
approxQuantile
在这里不是很合适,因为它不允许分组。然而,
这个问题可以使用 percentile_approx
和 Spark window 函数来解决(groupBy
在这里也是可能的,使用哪个取决于所需的数据帧格式)。首先我们做一些设置:
val df = Seq(
(10, "1/15/2018", 0.010680705, 10,0.019875458, "east"),
(10, "1/15/2018", 0.006628853, 4,0.816039063, "west"),
(10, "1/15/2018", 0.01378215, 20,0.082049528, "east"),
(10, "1/15/2018", 0.010680705, 6,0.019875458, "west"),
(10, "1/15/2018", 0.006628853, 30,0.816039063, "east"))
.toDF("id", "date", "revenue", "con_dist_1", "con_dist_2", "zone")
val percentiles = Seq(0.1, 1.0) // Which percentiles to calculate
val cols = Seq("con_dist_1", "con_dist_2") // The columns to use
要计算每个区域组的百分位数,可以按如下方式完成:
val window = Window.partitionBy("zone")
val percentile_func = (col: String) => expr(s"percentile_approx(${col}, array(${percentiles.mkString(",")}))")
val df2 = cols.foldLeft(df){case (df, c) => df.withColumn(c, percentile_func(c).over(window))}
结果会是这样的:
+---+---------+-----------+----------+--------------------------+----+
|id |date |revenue |con_dist_1|con_dist_2 |zone|
+---+---------+-----------+----------+--------------------------+----+
|10 |1/15/2018|0.006628853|[4, 6] |[0.019875458, 0.816039063]|west|
|10 |1/15/2018|0.010680705|[4, 6] |[0.019875458, 0.816039063]|west|
|10 |1/15/2018|0.010680705|[10, 30] |[0.019875458, 0.816039063]|east|
|10 |1/15/2018|0.01378215 |[10, 30] |[0.019875458, 0.816039063]|east|
|10 |1/15/2018|0.006628853|[10, 30] |[0.019875458, 0.816039063]|east|
+---+---------+-----------+----------+--------------------------+----+
接下来,我们要将数据帧转换为正确的格式。
这是对此处答案的轻微改编:.
cols.map{ case c =>
percentiles
.zipWithIndex
.foldLeft(df2.withColumn("perctile_col", lit(c))){ case (df2, (perc, index)) =>
df2.withColumn(s"qunantile_${perc}", col(c).getItem(index))
}
}
.reduce(_.union(_))
.drop(cols: _*) // these are not needed anymore
最终数据帧:
+---+---------+-----------+----+------------+-------------+-------------+
| id| date| revenue|zone|perctile_col|qunantile_0.1|qunantile_1.0|
+---+---------+-----------+----+------------+-------------+-------------+
| 10|1/15/2018|0.006628853|west| con_dist_1| 4.0| 6.0|
| 10|1/15/2018|0.010680705|west| con_dist_1| 4.0| 6.0|
| 10|1/15/2018|0.010680705|east| con_dist_1| 10.0| 30.0|
| 10|1/15/2018| 0.01378215|east| con_dist_1| 10.0| 30.0|
| 10|1/15/2018|0.006628853|east| con_dist_1| 10.0| 30.0|
| 10|1/15/2018|0.006628853|west| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018|0.010680705|west| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018|0.010680705|east| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018| 0.01378215|east| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018|0.006628853|east| con_dist_2| 0.019875458| 0.816039063|
+---+---------+-----------+----+------------+-------------+-------------+
我正在使用 spark-sql-2.4.1v,我正在尝试查找分位数,即 我给定数据的每一列上的百分位数 0、百分位数 25 等。 当我做多个百分位数时,如何检索每个计算的 结果的百分位数?
我的数据框df
:
+----+---------+-------------+----------+-----------+-------+
| id| date| revenue|con_dist_1| con_dist_2| zone |
+----+---------+-------------+----------+-----------+-------+
| 10|1/15/2018| 0.010680705| 10|0.019875458| east |
| 10|1/15/2018| 0.006628853| 4|0.816039063| west |
| 10|1/15/2018| 0.01378215| 20|0.082049528| east |
| 10|1/15/2018| 0.010680705| 6|0.019875458| west |
| 10|1/15/2018| 0.006628853| 30|0.816039063| east |
+----+---------+-------------+----------+-----------+-------+
最终数据帧应该如下所示,即每个区域:
+---+---------+-----------+-------+-------------+-----------+-----------+
| id| date| revenue| zone | perctile_col| quantile_0|quantile_10|
+---+---------+-----------+-------+-------------+-----------+-----------+
| 10|1/15/2018|0.010680705| east | con_dist_1 | 10.0| 30.0|
| 10|1/15/2018|0.010680705| east | con_dist_2 |0.019875458|0.816039063|
| 10|1/15/2018|0.010680705| west | con_dist_1 | 4.0| 6.0|
| 10|1/15/2018|0.010680705| west | con_dist_2 |0.019875458|0.816039063|
+---+---------+-----------+-------+-------------+-----------+-----------+
有什么方法可以使用partitionBy
和approxQuantile
函数吗?
这是否会使用 repartition("zone")
进行处理,即不收集每个区域的数据集?
approxQuantile
在这里不是很合适,因为它不允许分组。然而,
这个问题可以使用 percentile_approx
和 Spark window 函数来解决(groupBy
在这里也是可能的,使用哪个取决于所需的数据帧格式)。首先我们做一些设置:
val df = Seq(
(10, "1/15/2018", 0.010680705, 10,0.019875458, "east"),
(10, "1/15/2018", 0.006628853, 4,0.816039063, "west"),
(10, "1/15/2018", 0.01378215, 20,0.082049528, "east"),
(10, "1/15/2018", 0.010680705, 6,0.019875458, "west"),
(10, "1/15/2018", 0.006628853, 30,0.816039063, "east"))
.toDF("id", "date", "revenue", "con_dist_1", "con_dist_2", "zone")
val percentiles = Seq(0.1, 1.0) // Which percentiles to calculate
val cols = Seq("con_dist_1", "con_dist_2") // The columns to use
要计算每个区域组的百分位数,可以按如下方式完成:
val window = Window.partitionBy("zone")
val percentile_func = (col: String) => expr(s"percentile_approx(${col}, array(${percentiles.mkString(",")}))")
val df2 = cols.foldLeft(df){case (df, c) => df.withColumn(c, percentile_func(c).over(window))}
结果会是这样的:
+---+---------+-----------+----------+--------------------------+----+
|id |date |revenue |con_dist_1|con_dist_2 |zone|
+---+---------+-----------+----------+--------------------------+----+
|10 |1/15/2018|0.006628853|[4, 6] |[0.019875458, 0.816039063]|west|
|10 |1/15/2018|0.010680705|[4, 6] |[0.019875458, 0.816039063]|west|
|10 |1/15/2018|0.010680705|[10, 30] |[0.019875458, 0.816039063]|east|
|10 |1/15/2018|0.01378215 |[10, 30] |[0.019875458, 0.816039063]|east|
|10 |1/15/2018|0.006628853|[10, 30] |[0.019875458, 0.816039063]|east|
+---+---------+-----------+----------+--------------------------+----+
接下来,我们要将数据帧转换为正确的格式。
这是对此处答案的轻微改编:
cols.map{ case c =>
percentiles
.zipWithIndex
.foldLeft(df2.withColumn("perctile_col", lit(c))){ case (df2, (perc, index)) =>
df2.withColumn(s"qunantile_${perc}", col(c).getItem(index))
}
}
.reduce(_.union(_))
.drop(cols: _*) // these are not needed anymore
最终数据帧:
+---+---------+-----------+----+------------+-------------+-------------+
| id| date| revenue|zone|perctile_col|qunantile_0.1|qunantile_1.0|
+---+---------+-----------+----+------------+-------------+-------------+
| 10|1/15/2018|0.006628853|west| con_dist_1| 4.0| 6.0|
| 10|1/15/2018|0.010680705|west| con_dist_1| 4.0| 6.0|
| 10|1/15/2018|0.010680705|east| con_dist_1| 10.0| 30.0|
| 10|1/15/2018| 0.01378215|east| con_dist_1| 10.0| 30.0|
| 10|1/15/2018|0.006628853|east| con_dist_1| 10.0| 30.0|
| 10|1/15/2018|0.006628853|west| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018|0.010680705|west| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018|0.010680705|east| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018| 0.01378215|east| con_dist_2| 0.019875458| 0.816039063|
| 10|1/15/2018|0.006628853|east| con_dist_2| 0.019875458| 0.816039063|
+---+---------+-----------+----+------------+-------------+-------------+