计算其值落入 SPARK 中每个 bin 的记录数

Calculate the number of records whose values falls into each bin in SPARK

我有如下数据框:

------+--------------+
|   sid|first_term_gpa|
+------+--------------+
|100170|           2.0|
|100446|        3.8333|
|100884|           2.0|
|101055|           3.0|
|101094|        3.7333|
|101775|        3.7647|
|102524|        3.8235|
|102798|           3.5|
|102960|        2.8235|
|103357|           3.0|
|103747|        3.8571|
|103902|           3.8|
|104053|        3.1667|
|104064|        1.8462|

并且我已经创建了一个 UDF 函数

def student_gpa(gpa):
    bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
    return bins[float(gpa)]

带参数 gpa 预计为 float

我将上面创建的 UDF 应用到 first_term_gpa 列以创建一个名为 gpa_bin 的新列,代码如下:

alumni_ft_gpa = first_term_gpa \
.withColumn('gpa_bin', expr('student_gpa(first_term_gpa)'))\
.show()

但它抛出错误:

An exception was thrown from a UDF: 'TypeError: list indices must be integers or slices, not float', 

我在这里缺少什么?

使用进口

这是一个基于您的尝试的有效解决方案:

from pyspark.sql import Row, functions as F
from pyspark.sql.types import StringType   


df = spark.createDataFrame(
[Row(sid=100170, first_term_gpa=2.0),
 Row(sid=100446, first_term_gpa=3.8333),
 Row(sid=100884, first_term_gpa=2.0),
 Row(sid=101055, first_term_gpa=3.0),
 Row(sid=101094, first_term_gpa=3.7333),
 Row(sid=101775, first_term_gpa=3.7647),
 Row(sid=102524, first_term_gpa=3.8235),
 Row(sid=102798, first_term_gpa=3.5),
 Row(sid=102960, first_term_gpa=2.8235),
 Row(sid=103357, first_term_gpa=3.0),
 Row(sid=103747, first_term_gpa=3.8571),
 Row(sid=103902, first_term_gpa=3.8),
 Row(sid=104053, first_term_gpa=3.1667),
 Row(sid=104064, first_term_gpa=1.8462)]
)

@F.udf(StringType())
def student_gpa(gpa):
    bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
    return bins[int(gpa)]

df \
   .withColumn('gpa_bin', student_gpa('first_term_gpa'))\
   .show()

输出

+------+--------------+-------+
|   sid|first_term_gpa|gpa_bin|
+------+--------------+-------+
|100170|           2.0|  [2,3)|
|100446|        3.8333|  [3,4)|
|100884|           2.0|  [2,3)|
|101055|           3.0|  [3,4)|
|101094|        3.7333|  [3,4)|
|101775|        3.7647|  [3,4)|
|102524|        3.8235|  [3,4)|
|102798|           3.5|  [3,4)|
|102960|        2.8235|  [2,3)|
|103357|           3.0|  [3,4)|
|103747|        3.8571|  [3,4)|
|103902|           3.8|  [3,4)|
|104053|        3.1667|  [3,4)|
|104064|        1.8462|  [1,2)|
+------+--------------+-------+

我将 gpa 转换为整数的原因与我们构建间隔的方式有关。例如。 gpa=2.5 预计会产生 bin [2,3),它对应于 bins 列表中的索引 2。我们通过将 2.5 转换为整数来实现这一点。

仅使用 expr

from pyspark.sql.functions import expr

def student_gpa2(gpa):
    bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
    return bins[int(gpa)]

spark.udf.register("student_gpa2", student_gpa2)
df.withColumn('new_col', expr("student_gpa2(first_term_gpa)")).show()