PySpark 使用 percentile_approx 获取 P95 值远大于同一列的最大值

PySpark use percentile_approx to get P95 value is much greater than the max value of same column

我想获取groupBy之后某列的P95值,但是查看结果时发现P95值大于最大值。我的用法如下:

from pyspark.sql.types import StructType, StructField, StringType, LongType, FloatType
instance_util_schema = StructType([StructField("namespace", StringType(), True),
                                   StructField("metricname", StringType(), True),
                                   StructField("instance_id", StringType(), True),
                                   StructField("time", LongType(), True),
                                   StructField("maxvalue", FloatType(), True),
                                   StructField("minvalue", FloatType(), True),
                                   StructField("meanvalue", FloatType(), True),
                                   StructField("sumvalue", FloatType(), True),
                                   StructField("number", LongType(), True),
                                   StructField("region", StringType(), True),
                                   StructField("date", StringType(), True)
                                   ])

df = spark.read.csv("xxxx", header=True)

df = df.drop('minvalue', 'meanvalue', 'sumvalue', 'number')
df = df.withColumn("ts", from_unixtime(df['time'] / 1000)) \
    .withColumn("year", date_format("ts", "yyyy")) \
    .withColumn("month", date_format("ts", "MM")) \
    .withColumn("day", date_format("ts", "dd")) \
    .withColumn("hour", date_format("ts", "HH"))

dfg = df.groupBy("instance_id","year","month", "day", "hour").agg(
    F.min('time').alias("timestamp"),
    F.max(F.col('maxvalue')).alias('max'),
    F.percentile_approx(F.col('maxvalue'), 0.95).alias('p95'),
    F.percentile_approx(F.col('maxvalue'), 0.90).alias('p90'),
    F.percentile_approx(F.col('maxvalue'), 0.50).alias('p50'),
    F.percentile_approx(F.col('maxvalue'), 0.05).alias('p5'),
    F.min('maxvalue').alias('min')
)

现在我用这个来处理这个问题,但我仍然不知道原因。

dfg = df.withColumn('maxvalue', F.col('maxvalue').cast(FloatType())).groupBy(
    "instance_id", "year", "month", "day", "hour").agg(
    F.min('time').alias("timestamp"),
    F.max(F.col('maxvalue')).alias('max'),
    F.expr("percentile(maxvalue, 0.95)").alias('p95'),
    F.expr("percentile(maxvalue, 0.90)").alias('p90'),
    F.expr("percentile(maxvalue, 0.50)").alias('p50'),
    F.expr("percentile(maxvalue, 0.05)").alias('p5'),
    F.min('maxvalue').alias('min'))

不是因为 percentile_approx。这是因为您的“maxvalue”列实际上不是 float 类型。在您的固定代码中,此列的类型已更改为 float,因此可以正常工作。在这种情况下,当数字以字符串形式给出时,百分位数计算正确,但最小值和最大值不正确。

df = spark.createDataFrame([("9.65600",), ("18.89700",), ("10.39600",)], ["maxvalue"])
dfg = df.groupBy().agg(
    F.max(F.col('maxvalue')).alias('max'),
    F.percentile_approx(F.col('maxvalue'), 0.95).alias('p95'),
    F.percentile_approx(F.col('maxvalue'), 0.90).alias('p90'),
    F.percentile_approx(F.col('maxvalue'), 0.50).alias('p50'),
    F.percentile_approx(F.col('maxvalue'), 0.05).alias('p5'),
    F.min('maxvalue').alias('min')
)
dfg.show()
#+-------+------+------+------+-----+--------+
#|    max|   p95|   p90|   p50|   p5|     min|
#+-------+------+------+------+-----+--------+
#|9.65600|18.897|18.897|10.396|9.656|10.39600|
#+-------+------+------+------+-----+--------+