如何使用 Python Dataframe API 在 Apache Spark 中找到中位数?

How to find the median in Apache Spark with Python Dataframe API?

PySpark API 提供了除中位数以外的许多聚合函数。 Spark 2 带有 approxQuantile ,它给出了近似的分位数,但精确的中位数计算起来非常昂贵。是否有更多 PySpark 方法来计算 Spark Dataframe 中一列值的中位数?

这是在 Python (Spark 1.6 +) 中使用 Dataframe API 的示例实现。

import pyspark.sql.functions as F
import numpy as np
from pyspark.sql.types import FloatType

假设我们在 "salaries" spark 数据框中有客户的月薪,例如:

月 | customer_id |工资

我们想找出所有月份每位客户的工资中位数

第 1 步:编写一个用户定义的函数来计算中位数

def find_median(values_list):
    try:
        median = np.median(values_list) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

第 2 步:通过将工资列收集到每行的工资列表中来汇总工资列:

salaries_list = salaries.groupBy("customer_id").agg(F.collect_list("salary").alias("salaries"))

第 3 步:在薪资列上调用 median_finder udf 并将中值添加为新列

salaries_list = salaries_list.withColumn("median",median_finder("salaries")) 

对于精确中位数(对于small-mid大小的数据帧),因为Spark 2.1可以使用percentile包裹在 expr:

中的函数
F.expr('percentile(c2, 0.5)')
df = spark.createDataFrame(
    [(1, 10),
     (1, 20),
     (2, 50)],
    ['c1', 'c2'])
df.groupby('c1').agg(F.expr('percentile(c2, 0.5)').alias('median')).show()
#  +---+------+
#  | c1|median|
#  +---+------+
#  |  1|  15.0|
#  |  2|  50.0|
#  +---+------+
df.withColumn('median', F.expr('percentile(c2, 0.5)').over(W.partitionBy('c1'))).show()
#  +---+---+------+
#  | c1| c2|median|
#  +---+---+------+
#  |  1| 10|  15.0|
#  |  1| 20|  15.0|
#  |  2| 50|  50.0|
#  +---+---+------+

近似中位数 通常是 mid-large 大小数据帧的更好选择。

Spark 2.1 实现 approx_percentile and percentile_approx:

F.expr('percentile_approx(c2, 0.5)')

因为 Spark 3.1 可以直接在 PySpark API 中使用它:

F.percentile_approx('c2', 0.5)