Percentile.INC 在 Pyspark 中
Percentile.INC in Pyspark
我需要使用 Pyspark 复制 Excel 的 Percentile.INC 功能。以下是我的输入数据框(预计这是一个非常大的数据集)
df_schema = StructType([StructField('FacilityKey', StringType(), True), \
StructField('ItemKey', StringType(), True), \
StructField("ItemValye", FloatType(), True)])
df_data = [('F1', 'I1', 2.4),('F2', 'I1', 3.17),('F3', 'I1', 4.25)]
input_df = spark.createDataFrame(df_data, df_schema)
我需要计算上述数据集上 Percentile 1 到 99 之间所有停靠点的插值。
预期结果(抽样 1 到 10)
| PercentileRank | Item | PerformanceScore |
|----------------|------|------------------|
| 1 | I1 | 2.4154 |
| 2 | I1 | 2.4308 |
| 3 | I1 | 2.4462 |
| 4 | I1 | 2.4616 |
| 5 | I1 | 2.477 |
| 6 | I1 | 2.4924 |
| 7 | I1 | 2.5078 |
| 8 | I1 | 2.5232 |
| 9 | I1 | 2.5386 |
| 10 | I1 | 2.554 |
| 11 | I1 | 2.5694 |
我可以通过以下方式在 python 中使用 numpy 复制结果
# 1D array
arr = [2.4, 3.17, 4.25]
print("arr : ", arr)
# print("1st percentile of arr : ",
# np.percentile(arr, 1))
# print("25th percentile of arr : ",
# np.percentile(arr, 25))
# print("75th percentile of arr : ",
# np.percentile(arr, 75))
for i in range(1,99):
print("%s percentile of arr : ",i,np.percentile(arr, i))
无法弄清楚如何使用 Pyspark 进行相同的计算。
预先感谢您的帮助。
检查这是否有帮助 -
加载测试数据
val df = Seq(("F1", "I1", 2.4),("F2", "I1", 3.17),("F3", "I1", 4.25))
.toDF("FacilityKey", "ItemKey", "ItemValue")
df.show(false)
df.printSchema()
/**
* +-----------+-------+---------+
* |FacilityKey|ItemKey|ItemValue|
* +-----------+-------+---------+
* |F1 |I1 |2.4 |
* |F2 |I1 |3.17 |
* |F3 |I1 |4.25 |
* +-----------+-------+---------+
*
* root
* |-- FacilityKey: string (nullable = true)
* |-- ItemKey: string (nullable = true)
* |-- ItemValue: double (nullable = false)
*/
计算四分位数范围的百分位数
df
.groupBy("ItemKey")
.agg(
expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
.as("percentile"))
.withColumn("percentile", explode($"percentile"))
.show(false)
/**
* +-------+------------------+
* |ItemKey|percentile |
* +-------+------------------+
* |I1 |2.4154 |
* |I1 |2.4307999999999996|
* |I1 |2.4461999999999997|
* |I1 |2.4616000000000002|
* |I1 |2.4770000000000003|
* |I1 |2.4924 |
* |I1 |2.5078 |
* |I1 |2.5232 |
* |I1 |2.5385999999999997|
* |I1 |2.554 |
* |I1 |2.5694 |
* |I1 |2.5847999999999995|
* |I1 |2.6002 |
* |I1 |2.6156 |
* |I1 |2.631 |
* |I1 |2.6464 |
* |I1 |2.6618 |
* |I1 |2.6772 |
* |I1 |2.6925999999999997|
* |I1 |2.708 |
* +-------+------------------+
* only showing top 20 rows
*/
执行计划
df
.groupBy("ItemKey")
.agg(
expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
.as("percentile"))
.withColumn("percentile", explode($"percentile"))
.explain()
/**
* == Physical Plan ==
* Generate explode(percentile#58), [ItemKey#8], false, [percentile#67]
* +- ObjectHashAggregate(keys=[ItemKey#8], functions=[percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
* +- Exchange hashpartitioning(ItemKey#8, 2)
* +- ObjectHashAggregate(keys=[ItemKey#8], functions=[partial_percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
* +- LocalTableScan [ItemKey#8, ItemValue#9]
*/
您可能需要考虑 approx_percentile
以加快执行速度
df
.groupBy("ItemKey")
.agg(
expr(s"approx_percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
.as("percentile"))
.withColumn("percentile", explode($"percentile"))
.show(false)
/**
* +-------+----------+
* |ItemKey|percentile|
* +-------+----------+
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* +-------+----------+
* only showing top 20 rows
*/
Please let me know if you see any issues
我需要使用 Pyspark 复制 Excel 的 Percentile.INC 功能。以下是我的输入数据框(预计这是一个非常大的数据集)
df_schema = StructType([StructField('FacilityKey', StringType(), True), \
StructField('ItemKey', StringType(), True), \
StructField("ItemValye", FloatType(), True)])
df_data = [('F1', 'I1', 2.4),('F2', 'I1', 3.17),('F3', 'I1', 4.25)]
input_df = spark.createDataFrame(df_data, df_schema)
我需要计算上述数据集上 Percentile 1 到 99 之间所有停靠点的插值。 预期结果(抽样 1 到 10)
| PercentileRank | Item | PerformanceScore |
|----------------|------|------------------|
| 1 | I1 | 2.4154 |
| 2 | I1 | 2.4308 |
| 3 | I1 | 2.4462 |
| 4 | I1 | 2.4616 |
| 5 | I1 | 2.477 |
| 6 | I1 | 2.4924 |
| 7 | I1 | 2.5078 |
| 8 | I1 | 2.5232 |
| 9 | I1 | 2.5386 |
| 10 | I1 | 2.554 |
| 11 | I1 | 2.5694 |
我可以通过以下方式在 python 中使用 numpy 复制结果
# 1D array
arr = [2.4, 3.17, 4.25]
print("arr : ", arr)
# print("1st percentile of arr : ",
# np.percentile(arr, 1))
# print("25th percentile of arr : ",
# np.percentile(arr, 25))
# print("75th percentile of arr : ",
# np.percentile(arr, 75))
for i in range(1,99):
print("%s percentile of arr : ",i,np.percentile(arr, i))
无法弄清楚如何使用 Pyspark 进行相同的计算。 预先感谢您的帮助。
检查这是否有帮助 -
加载测试数据
val df = Seq(("F1", "I1", 2.4),("F2", "I1", 3.17),("F3", "I1", 4.25))
.toDF("FacilityKey", "ItemKey", "ItemValue")
df.show(false)
df.printSchema()
/**
* +-----------+-------+---------+
* |FacilityKey|ItemKey|ItemValue|
* +-----------+-------+---------+
* |F1 |I1 |2.4 |
* |F2 |I1 |3.17 |
* |F3 |I1 |4.25 |
* +-----------+-------+---------+
*
* root
* |-- FacilityKey: string (nullable = true)
* |-- ItemKey: string (nullable = true)
* |-- ItemValue: double (nullable = false)
*/
计算四分位数范围的百分位数
df
.groupBy("ItemKey")
.agg(
expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
.as("percentile"))
.withColumn("percentile", explode($"percentile"))
.show(false)
/**
* +-------+------------------+
* |ItemKey|percentile |
* +-------+------------------+
* |I1 |2.4154 |
* |I1 |2.4307999999999996|
* |I1 |2.4461999999999997|
* |I1 |2.4616000000000002|
* |I1 |2.4770000000000003|
* |I1 |2.4924 |
* |I1 |2.5078 |
* |I1 |2.5232 |
* |I1 |2.5385999999999997|
* |I1 |2.554 |
* |I1 |2.5694 |
* |I1 |2.5847999999999995|
* |I1 |2.6002 |
* |I1 |2.6156 |
* |I1 |2.631 |
* |I1 |2.6464 |
* |I1 |2.6618 |
* |I1 |2.6772 |
* |I1 |2.6925999999999997|
* |I1 |2.708 |
* +-------+------------------+
* only showing top 20 rows
*/
执行计划
df
.groupBy("ItemKey")
.agg(
expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
.as("percentile"))
.withColumn("percentile", explode($"percentile"))
.explain()
/**
* == Physical Plan ==
* Generate explode(percentile#58), [ItemKey#8], false, [percentile#67]
* +- ObjectHashAggregate(keys=[ItemKey#8], functions=[percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
* +- Exchange hashpartitioning(ItemKey#8, 2)
* +- ObjectHashAggregate(keys=[ItemKey#8], functions=[partial_percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
* +- LocalTableScan [ItemKey#8, ItemValue#9]
*/
您可能需要考虑 approx_percentile
以加快执行速度
df
.groupBy("ItemKey")
.agg(
expr(s"approx_percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
.as("percentile"))
.withColumn("percentile", explode($"percentile"))
.show(false)
/**
* +-------+----------+
* |ItemKey|percentile|
* +-------+----------+
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* |I1 |2.4 |
* +-------+----------+
* only showing top 20 rows
*/
Please let me know if you see any issues