Percentile.INC 在 Pyspark 中

Percentile.INC in Pyspark

我需要使用 Pyspark 复制 Excel 的 Percentile.INC 功能。以下是我的输入数据框(预计这是一个非常大的数据集)

df_schema = StructType([StructField('FacilityKey', StringType(), True), \
                        StructField('ItemKey', StringType(), True), \
                        StructField("ItemValye", FloatType(), True)])

df_data =  [('F1', 'I1', 2.4),('F2', 'I1', 3.17),('F3', 'I1', 4.25)]

input_df = spark.createDataFrame(df_data, df_schema)

我需要计算上述数据集上 Percentile 1 到 99 之间所有停靠点的插值。 预期结果(抽样 1 到 10)

| PercentileRank | Item | PerformanceScore |
|----------------|------|------------------|
| 1              | I1   | 2.4154           |
| 2              | I1   | 2.4308           |
| 3              | I1   | 2.4462           |
| 4              | I1   | 2.4616           |
| 5              | I1   | 2.477            |
| 6              | I1   | 2.4924           |
| 7              | I1   | 2.5078           |
| 8              | I1   | 2.5232           |
| 9              | I1   | 2.5386           |
| 10             | I1   | 2.554            |
| 11             | I1   | 2.5694           |

我可以通过以下方式在 python 中使用 numpy 复制结果

# 1D array  
arr = [2.4, 3.17, 4.25] 
print("arr : ", arr)  
# print("1st percentile of arr : ",  
#        np.percentile(arr, 1)) 
# print("25th percentile of arr : ", 
#        np.percentile(arr, 25)) 
# print("75th percentile of arr : ", 
#        np.percentile(arr, 75)) 

for i in range(1,99):
  print("%s percentile of arr : ",i,np.percentile(arr, i))

无法弄清楚如何使用 Pyspark 进行相同的计算。 预先感谢您的帮助。

检查这是否有帮助 -

加载测试数据

     val df = Seq(("F1", "I1", 2.4),("F2", "I1", 3.17),("F3", "I1", 4.25))
      .toDF("FacilityKey", "ItemKey", "ItemValue")
    df.show(false)
    df.printSchema()

    /**
      * +-----------+-------+---------+
      * |FacilityKey|ItemKey|ItemValue|
      * +-----------+-------+---------+
      * |F1         |I1     |2.4      |
      * |F2         |I1     |3.17     |
      * |F3         |I1     |4.25     |
      * +-----------+-------+---------+
      *
      * root
      * |-- FacilityKey: string (nullable = true)
      * |-- ItemKey: string (nullable = true)
      * |-- ItemValue: double (nullable = false)
      */

计算四分位数范围的百分位数


    df
      .groupBy("ItemKey")
      .agg(
        expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
          .as("percentile"))
      .withColumn("percentile", explode($"percentile"))
      .show(false)

    /**
      * +-------+------------------+
      * |ItemKey|percentile        |
      * +-------+------------------+
      * |I1     |2.4154            |
      * |I1     |2.4307999999999996|
      * |I1     |2.4461999999999997|
      * |I1     |2.4616000000000002|
      * |I1     |2.4770000000000003|
      * |I1     |2.4924            |
      * |I1     |2.5078            |
      * |I1     |2.5232            |
      * |I1     |2.5385999999999997|
      * |I1     |2.554             |
      * |I1     |2.5694            |
      * |I1     |2.5847999999999995|
      * |I1     |2.6002            |
      * |I1     |2.6156            |
      * |I1     |2.631             |
      * |I1     |2.6464            |
      * |I1     |2.6618            |
      * |I1     |2.6772            |
      * |I1     |2.6925999999999997|
      * |I1     |2.708             |
      * +-------+------------------+
      * only showing top 20 rows
      */

执行计划

    df
      .groupBy("ItemKey")
      .agg(
        expr(s"percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
          .as("percentile"))
      .withColumn("percentile", explode($"percentile"))
      .explain()

    /**
      * == Physical Plan ==
      * Generate explode(percentile#58), [ItemKey#8], false, [percentile#67]
      * +- ObjectHashAggregate(keys=[ItemKey#8], functions=[percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
      * +- Exchange hashpartitioning(ItemKey#8, 2)
      * +- ObjectHashAggregate(keys=[ItemKey#8], functions=[partial_percentile(ItemValue#9, [0.01,0.02,0.03,0.04,0.05,0.06,0.07,0.08,0.09,0.1,0.11,0.12,0.13,0.14,0.15,0.16,0.17,0.18,0.19,0.2,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.3,0.31,0.32,0.33,0.34,0.35000000000000003,0.36,0.37,0.38,0.39,0.4,0.41000000000000003,0.42,0.43,0.44,0.45,0.46,0.47000000000000003,0.48,0.49,0.5,0.51,0.52,0.53,0.54,0.55,0.56,0.5700000000000001,0.58,0.59,0.6,0.61,0.62,0.63,0.64,0.65,0.66,0.67,0.68,0.6900000000000001,0.7000000000000001,0.71,0.72,0.73,0.74,0.75,0.76,0.77,0.78,0.79,0.8,0.81,0.8200000000000001,0.8300000000000001,0.84,0.85,0.86,0.87,0.88,0.89,0.9,0.91,0.92,0.93,0.9400000000000001,0.9500000000000001,0.96,0.97,0.98,0.99], 1, 0, 0)])
      * +- LocalTableScan [ItemKey#8, ItemValue#9]
      */

您可能需要考虑 approx_percentile 以加快执行速度

df
      .groupBy("ItemKey")
      .agg(
        expr(s"approx_percentile(ItemValue, array(${Range(1, 100).map(_ * 0.01).mkString(", ")}))")
          .as("percentile"))
      .withColumn("percentile", explode($"percentile"))
      .show(false)

    /**
      * +-------+----------+
      * |ItemKey|percentile|
      * +-------+----------+
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * |I1     |2.4       |
      * +-------+----------+
      * only showing top 20 rows
      */

Please let me know if you see any issues