计算其值落入 SPARK 中每个 bin 的记录数
Calculate the number of records whose values falls into each bin in SPARK
我有如下数据框:
------+--------------+
| sid|first_term_gpa|
+------+--------------+
|100170| 2.0|
|100446| 3.8333|
|100884| 2.0|
|101055| 3.0|
|101094| 3.7333|
|101775| 3.7647|
|102524| 3.8235|
|102798| 3.5|
|102960| 2.8235|
|103357| 3.0|
|103747| 3.8571|
|103902| 3.8|
|104053| 3.1667|
|104064| 1.8462|
并且我已经创建了一个 UDF 函数
def student_gpa(gpa):
bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
return bins[float(gpa)]
带参数 gpa 预计为 float
我将上面创建的 UDF 应用到 first_term_gpa 列以创建一个名为 gpa_bin 的新列,代码如下:
alumni_ft_gpa = first_term_gpa \
.withColumn('gpa_bin', expr('student_gpa(first_term_gpa)'))\
.show()
但它抛出错误:
An exception was thrown from a UDF: 'TypeError: list indices must be integers or slices, not float',
我在这里缺少什么?
使用进口
这是一个基于您的尝试的有效解决方案:
from pyspark.sql import Row, functions as F
from pyspark.sql.types import StringType
df = spark.createDataFrame(
[Row(sid=100170, first_term_gpa=2.0),
Row(sid=100446, first_term_gpa=3.8333),
Row(sid=100884, first_term_gpa=2.0),
Row(sid=101055, first_term_gpa=3.0),
Row(sid=101094, first_term_gpa=3.7333),
Row(sid=101775, first_term_gpa=3.7647),
Row(sid=102524, first_term_gpa=3.8235),
Row(sid=102798, first_term_gpa=3.5),
Row(sid=102960, first_term_gpa=2.8235),
Row(sid=103357, first_term_gpa=3.0),
Row(sid=103747, first_term_gpa=3.8571),
Row(sid=103902, first_term_gpa=3.8),
Row(sid=104053, first_term_gpa=3.1667),
Row(sid=104064, first_term_gpa=1.8462)]
)
@F.udf(StringType())
def student_gpa(gpa):
bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
return bins[int(gpa)]
df \
.withColumn('gpa_bin', student_gpa('first_term_gpa'))\
.show()
输出
+------+--------------+-------+
| sid|first_term_gpa|gpa_bin|
+------+--------------+-------+
|100170| 2.0| [2,3)|
|100446| 3.8333| [3,4)|
|100884| 2.0| [2,3)|
|101055| 3.0| [3,4)|
|101094| 3.7333| [3,4)|
|101775| 3.7647| [3,4)|
|102524| 3.8235| [3,4)|
|102798| 3.5| [3,4)|
|102960| 2.8235| [2,3)|
|103357| 3.0| [3,4)|
|103747| 3.8571| [3,4)|
|103902| 3.8| [3,4)|
|104053| 3.1667| [3,4)|
|104064| 1.8462| [1,2)|
+------+--------------+-------+
我将 gpa
转换为整数的原因与我们构建间隔的方式有关。例如。 gpa=2.5
预计会产生 bin [2,3)
,它对应于 bins
列表中的索引 2
。我们通过将 2.5
转换为整数来实现这一点。
仅使用 expr
from pyspark.sql.functions import expr
def student_gpa2(gpa):
bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
return bins[int(gpa)]
spark.udf.register("student_gpa2", student_gpa2)
df.withColumn('new_col', expr("student_gpa2(first_term_gpa)")).show()
我有如下数据框:
------+--------------+
| sid|first_term_gpa|
+------+--------------+
|100170| 2.0|
|100446| 3.8333|
|100884| 2.0|
|101055| 3.0|
|101094| 3.7333|
|101775| 3.7647|
|102524| 3.8235|
|102798| 3.5|
|102960| 2.8235|
|103357| 3.0|
|103747| 3.8571|
|103902| 3.8|
|104053| 3.1667|
|104064| 1.8462|
并且我已经创建了一个 UDF 函数
def student_gpa(gpa):
bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
return bins[float(gpa)]
带参数 gpa 预计为 float
我将上面创建的 UDF 应用到 first_term_gpa 列以创建一个名为 gpa_bin 的新列,代码如下:
alumni_ft_gpa = first_term_gpa \
.withColumn('gpa_bin', expr('student_gpa(first_term_gpa)'))\
.show()
但它抛出错误:
An exception was thrown from a UDF: 'TypeError: list indices must be integers or slices, not float',
我在这里缺少什么?
使用进口
这是一个基于您的尝试的有效解决方案:
from pyspark.sql import Row, functions as F
from pyspark.sql.types import StringType
df = spark.createDataFrame(
[Row(sid=100170, first_term_gpa=2.0),
Row(sid=100446, first_term_gpa=3.8333),
Row(sid=100884, first_term_gpa=2.0),
Row(sid=101055, first_term_gpa=3.0),
Row(sid=101094, first_term_gpa=3.7333),
Row(sid=101775, first_term_gpa=3.7647),
Row(sid=102524, first_term_gpa=3.8235),
Row(sid=102798, first_term_gpa=3.5),
Row(sid=102960, first_term_gpa=2.8235),
Row(sid=103357, first_term_gpa=3.0),
Row(sid=103747, first_term_gpa=3.8571),
Row(sid=103902, first_term_gpa=3.8),
Row(sid=104053, first_term_gpa=3.1667),
Row(sid=104064, first_term_gpa=1.8462)]
)
@F.udf(StringType())
def student_gpa(gpa):
bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
return bins[int(gpa)]
df \
.withColumn('gpa_bin', student_gpa('first_term_gpa'))\
.show()
输出
+------+--------------+-------+
| sid|first_term_gpa|gpa_bin|
+------+--------------+-------+
|100170| 2.0| [2,3)|
|100446| 3.8333| [3,4)|
|100884| 2.0| [2,3)|
|101055| 3.0| [3,4)|
|101094| 3.7333| [3,4)|
|101775| 3.7647| [3,4)|
|102524| 3.8235| [3,4)|
|102798| 3.5| [3,4)|
|102960| 2.8235| [2,3)|
|103357| 3.0| [3,4)|
|103747| 3.8571| [3,4)|
|103902| 3.8| [3,4)|
|104053| 3.1667| [3,4)|
|104064| 1.8462| [1,2)|
+------+--------------+-------+
我将 gpa
转换为整数的原因与我们构建间隔的方式有关。例如。 gpa=2.5
预计会产生 bin [2,3)
,它对应于 bins
列表中的索引 2
。我们通过将 2.5
转换为整数来实现这一点。
仅使用 expr
from pyspark.sql.functions import expr
def student_gpa2(gpa):
bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
return bins[int(gpa)]
spark.udf.register("student_gpa2", student_gpa2)
df.withColumn('new_col', expr("student_gpa2(first_term_gpa)")).show()