对 pyspark 数据帧值进行分类
categorize pyspark dataframe values
我需要将 DF 中的值分为不同的类别。以下是输入和预期输出。我看到了这个答案 但我想尽可能不将 DF 转换为 RDD。
Input
+---------+-------------------+
| ID| value|
+---------+-------------------+
| 2| 50.34|
| 4| 34.4|
| 6| 48.7|
| 10| 72.4|
+---------+-------------------+
OutputDF
+---------+-------------------+----------+
| bucket| size| count |
+---------+-------------------+--------- +
| 0| 0-20| 0|
| 1| 20-40| 1|
| 2| 40-60| 2|
| 3| 60-80| 1|
+---------+-------------------+----------+
您可以使用 Bucketizer 根据您希望确定的拆分对 value
进行装箱,一旦针对每一行标记了存储桶,您就可以使用对应于 bin 的 udf 进一步对它们进行分类至
数据准备
input_str = """
2| 50.34|
4| 34.4|
6| 48.7|
10| 72.4
""".split("|")
input_values = list(map(lambda x: x.strip() if x.strip() != '|' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "id,value".split(',')))
n = len(input_values)
n_cols = 2
input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]
sparkDF = sql.createDataFrame(input_list, cols)
sparkDF = sparkDF.withColumn('value',F.col('value').cast('float'))\
.withColumn('id',F.col('id').cast('int'))
sparkDF.show()
+---+-----+
| id|value|
+---+-----+
| 2|50.34|
| 4| 34.4|
| 6| 48.7|
| 10| 72.4|
+---+-----+
分桶器
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(
splits=[0,20,40,60,80],
inputCol='value',
outputCol='value_bin'
)
sparkDF = bucketizer.transform(sparkDF)
sparkDF.show()
+---+-----+---------+
| id|value|value_bin|
+---+-----+---------+
| 2|50.34| 2.0|
| 4| 34.4| 1.0|
| 6| 48.7| 2.0|
| 10| 72.4| 3.0|
+---+-----+---------+
对值 bin 进行分类
split_arr = bucketizer.getSplits()
### O/P --> [0.0, 20.0, 40.0, 60.0, 80.0]
format_udf = F.udf(lambda x:f'{int(split_arr[int(x)])}-{int(split_arr[int(x)+1])}',StringType())
sparkDF = sparkDF.withColumn(
'numbers_bucket',format_udf('value_bin')
)
sparkDF.show()
+---+-----+---------+--------------+
| id|value|value_bin|numbers_bucket|
+---+-----+---------+--------------+
| 2|50.34| 2.0| 40-60|
| 4| 34.4| 1.0| 20-40|
| 6| 48.7| 2.0| 40-60|
| 10| 72.4| 3.0| 60-80|
+---+-----+---------+--------------+
最终输出 - GroupBy
sparkDF = sparkDF.groupby(['value_bin','numbers_bucket']).count()
sparkDF.show()
+---------+--------------+-----+
|value_bin|numbers_bucket|count|
+---------+--------------+-----+
| 2.0| 40-60| 2|
| 3.0| 60-80| 1|
| 1.0| 20-40| 1|
+---------+--------------+-----+
我需要将 DF 中的值分为不同的类别。以下是输入和预期输出。我看到了这个答案
Input
+---------+-------------------+
| ID| value|
+---------+-------------------+
| 2| 50.34|
| 4| 34.4|
| 6| 48.7|
| 10| 72.4|
+---------+-------------------+
OutputDF
+---------+-------------------+----------+
| bucket| size| count |
+---------+-------------------+--------- +
| 0| 0-20| 0|
| 1| 20-40| 1|
| 2| 40-60| 2|
| 3| 60-80| 1|
+---------+-------------------+----------+
您可以使用 Bucketizer 根据您希望确定的拆分对 value
进行装箱,一旦针对每一行标记了存储桶,您就可以使用对应于 bin 的 udf 进一步对它们进行分类至
数据准备
input_str = """
2| 50.34|
4| 34.4|
6| 48.7|
10| 72.4
""".split("|")
input_values = list(map(lambda x: x.strip() if x.strip() != '|' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "id,value".split(',')))
n = len(input_values)
n_cols = 2
input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]
sparkDF = sql.createDataFrame(input_list, cols)
sparkDF = sparkDF.withColumn('value',F.col('value').cast('float'))\
.withColumn('id',F.col('id').cast('int'))
sparkDF.show()
+---+-----+
| id|value|
+---+-----+
| 2|50.34|
| 4| 34.4|
| 6| 48.7|
| 10| 72.4|
+---+-----+
分桶器
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(
splits=[0,20,40,60,80],
inputCol='value',
outputCol='value_bin'
)
sparkDF = bucketizer.transform(sparkDF)
sparkDF.show()
+---+-----+---------+
| id|value|value_bin|
+---+-----+---------+
| 2|50.34| 2.0|
| 4| 34.4| 1.0|
| 6| 48.7| 2.0|
| 10| 72.4| 3.0|
+---+-----+---------+
对值 bin 进行分类
split_arr = bucketizer.getSplits()
### O/P --> [0.0, 20.0, 40.0, 60.0, 80.0]
format_udf = F.udf(lambda x:f'{int(split_arr[int(x)])}-{int(split_arr[int(x)+1])}',StringType())
sparkDF = sparkDF.withColumn(
'numbers_bucket',format_udf('value_bin')
)
sparkDF.show()
+---+-----+---------+--------------+
| id|value|value_bin|numbers_bucket|
+---+-----+---------+--------------+
| 2|50.34| 2.0| 40-60|
| 4| 34.4| 1.0| 20-40|
| 6| 48.7| 2.0| 40-60|
| 10| 72.4| 3.0| 60-80|
+---+-----+---------+--------------+
最终输出 - GroupBy
sparkDF = sparkDF.groupby(['value_bin','numbers_bucket']).count()
sparkDF.show()
+---------+--------------+-----+
|value_bin|numbers_bucket|count|
+---------+--------------+-----+
| 2.0| 40-60| 2|
| 3.0| 60-80| 1|
| 1.0| 20-40| 1|
+---------+--------------+-----+