对 pyspark 数据帧值进行分类

categorize pyspark dataframe values

我需要将 DF 中的值分为不同的类别。以下是输入和预期输出。我看到了这个答案 但我想尽可能不将 DF 转换为 RDD。

Input
+---------+-------------------+
|       ID|              value|
+---------+-------------------+
|        2|              50.34|
|        4|               34.4|
|        6|               48.7|
|       10|               72.4|
+---------+-------------------+

OutputDF
+---------+-------------------+----------+
|   bucket|               size|    count |
+---------+-------------------+--------- +
|        0|               0-20|         0|
|        1|              20-40|         1|
|        2|              40-60|         2|
|        3|              60-80|         1|  
+---------+-------------------+----------+

您可以使用 Bucketizer 根据您希望确定的拆分对 value 进行装箱,一旦针对每一行标记了存储桶,您就可以使用对应于 bin 的 udf 进一步对它们进行分类至

数据准备

input_str = """
2|              50.34|
4|               34.4|
6|               48.7|
10|               72.4
""".split("|")

input_values = list(map(lambda x: x.strip() if x.strip() != '|' else None, input_str))

cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "id,value".split(',')))
        
n = len(input_values)
n_cols = 2

input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]

sparkDF = sql.createDataFrame(input_list, cols)

sparkDF = sparkDF.withColumn('value',F.col('value').cast('float'))\
                .withColumn('id',F.col('id').cast('int'))

sparkDF.show()

+---+-----+
| id|value|
+---+-----+
|  2|50.34|
|  4| 34.4|
|  6| 48.7|
| 10| 72.4|
+---+-----+

分桶器

from pyspark.ml.feature import Bucketizer

bucketizer = Bucketizer(
    splits=[0,20,40,60,80],
    inputCol='value', 
    outputCol='value_bin'
)

sparkDF = bucketizer.transform(sparkDF)

sparkDF.show()

+---+-----+---------+
| id|value|value_bin|
+---+-----+---------+
|  2|50.34|      2.0|
|  4| 34.4|      1.0|
|  6| 48.7|      2.0|
| 10| 72.4|      3.0|
+---+-----+---------+

对值 bin 进行分类

split_arr = bucketizer.getSplits()
### O/P --> [0.0, 20.0, 40.0, 60.0, 80.0]

format_udf = F.udf(lambda x:f'{int(split_arr[int(x)])}-{int(split_arr[int(x)+1])}',StringType())

sparkDF = sparkDF.withColumn(
    'numbers_bucket',format_udf('value_bin')
)

sparkDF.show()

+---+-----+---------+--------------+
| id|value|value_bin|numbers_bucket|
+---+-----+---------+--------------+
|  2|50.34|      2.0|         40-60|
|  4| 34.4|      1.0|         20-40|
|  6| 48.7|      2.0|         40-60|
| 10| 72.4|      3.0|         60-80|
+---+-----+---------+--------------+

最终输出 - GroupBy

sparkDF = sparkDF.groupby(['value_bin','numbers_bucket']).count()

sparkDF.show()

+---------+--------------+-----+
|value_bin|numbers_bucket|count|
+---------+--------------+-----+
|      2.0|         40-60|    2|
|      3.0|         60-80|    1|
|      1.0|         20-40|    1|
+---------+--------------+-----+