在 PySpark 数据帧上使用 groupBy 计算百分位数
Calculate percentile with groupBy on PySpark dataframe
我正在尝试 groupBy
然后计算 PySpark 数据帧上的百分位数。我根据 this Stack Overflow post:
测试了以下代码
from pyspark.sql.types import FloatType
import pyspark.sql.functions as func
import numpy as np
qt_udf = func.udf(lambda x,qt: float(np.percentile(x,qt)), FloatType())
df_out = df_in.groupBy('Id').agg(func.collect_list('value').alias('data'))\
.withColumn('median', qt_udf(func.col('data'),func.lit(0.5)).cast("string"))
df_out.show()
但是出现如下错误:
Traceback (most recent call last): > df_out.show() ....> return lambda *a: f(*a) AttributeError: 'module' object has no attribute 'percentile'
这是因为numpy版本(1.4.1),百分位数函数是从1.5版本开始添加的。短期内无法更新numpy版本
定义一个 window 并使用内置的 percent_rank
函数来计算百分位数。
from pyspark.sql import Window
from pyspark.sql import functions as func
w = Window.partitionBy(df_in.Id).orderBy(df_in.value) #assuming default ascending order
df_out = df_in.withColumn('percentile_col',func.percent_rank().over(w))
问题的标题表明 OP 想要计算百分位数。但是 body 表明他需要计算组中的中位数。
测试数据集:
from pyspark.sql import SparkSession, functions as F, Window as W, Window
spark = SparkSession.builder.getOrCreate()
df_in = spark.createDataFrame(
[('1', 10),
('1', 11),
('1', 12),
('1', 13),
('2', 20)],
['Id', 'value']
)
组中给定数据点的百分位数:
w = W.partitionBy('Id').orderBy('value')
df_in = df_in.withColumn('percentile_of_value_by_Id', F.percent_rank().over(w))
df_in.show()
#+---+-----+-------------------------+
#| Id|value|percentile_of_value_by_Id|
#+---+-----+-------------------------+
#| 1| 10| 0.0|
#| 1| 11| 0.3333333333333333|
#| 1| 12| 0.6666666666666666|
#| 1| 13| 1.0|
#| 2| 20| 0.0|
#+---+-----+-------------------------+
中位数(准确和近似):
df_out = (df_in.groupBy('Id').agg(
F.expr('percentile(value, .5)').alias('median_accurate'), # for small-mid dfs
F.percentile_approx('value', .5).alias('median_approximate') # for mid-large dfs
))
df_out.show()
#+---+---------------+------------------+
#| Id|median_accurate|median_approximate|
#+---+---------------+------------------+
#| 1| 11.5| 11|
#| 2| 20.0| 20|
#+---+---------------+------------------+
我正在尝试 groupBy
然后计算 PySpark 数据帧上的百分位数。我根据 this Stack Overflow post:
from pyspark.sql.types import FloatType
import pyspark.sql.functions as func
import numpy as np
qt_udf = func.udf(lambda x,qt: float(np.percentile(x,qt)), FloatType())
df_out = df_in.groupBy('Id').agg(func.collect_list('value').alias('data'))\
.withColumn('median', qt_udf(func.col('data'),func.lit(0.5)).cast("string"))
df_out.show()
但是出现如下错误:
Traceback (most recent call last): > df_out.show() ....> return lambda *a: f(*a) AttributeError: 'module' object has no attribute 'percentile'
这是因为numpy版本(1.4.1),百分位数函数是从1.5版本开始添加的。短期内无法更新numpy版本
定义一个 window 并使用内置的 percent_rank
函数来计算百分位数。
from pyspark.sql import Window
from pyspark.sql import functions as func
w = Window.partitionBy(df_in.Id).orderBy(df_in.value) #assuming default ascending order
df_out = df_in.withColumn('percentile_col',func.percent_rank().over(w))
问题的标题表明 OP 想要计算百分位数。但是 body 表明他需要计算组中的中位数。
测试数据集:
from pyspark.sql import SparkSession, functions as F, Window as W, Window
spark = SparkSession.builder.getOrCreate()
df_in = spark.createDataFrame(
[('1', 10),
('1', 11),
('1', 12),
('1', 13),
('2', 20)],
['Id', 'value']
)
组中给定数据点的百分位数:
w = W.partitionBy('Id').orderBy('value')
df_in = df_in.withColumn('percentile_of_value_by_Id', F.percent_rank().over(w))
df_in.show()
#+---+-----+-------------------------+
#| Id|value|percentile_of_value_by_Id|
#+---+-----+-------------------------+
#| 1| 10| 0.0|
#| 1| 11| 0.3333333333333333|
#| 1| 12| 0.6666666666666666|
#| 1| 13| 1.0|
#| 2| 20| 0.0|
#+---+-----+-------------------------+
中位数(准确和近似):
df_out = (df_in.groupBy('Id').agg(
F.expr('percentile(value, .5)').alias('median_accurate'), # for small-mid dfs
F.percentile_approx('value', .5).alias('median_approximate') # for mid-large dfs
))
df_out.show()
#+---+---------------+------------------+
#| Id|median_accurate|median_approximate|
#+---+---------------+------------------+
#| 1| 11.5| 11|
#| 2| 20.0| 20|
#+---+---------------+------------------+