如何在 pyspark 中使用 group by 规范化进行值计数
How to do value counts with normalize in pyspark with group by
我有一个 pyspark 数据框,如下所示:
import pandas as pd
so = pd.DataFrame({'id': ['a','a','a','a','b','b','b','b','c','c','c','c'],
'time': [1,2,3,4,1,2,3,4,1,2,3,4],
'group':['A','A','A','A','A','A','A','A','B','B','B','B'],
'value':['S','C','C','C', 'S','C','H', 'H', 'S','C','C','C']})
我想计算每个 group
中 value
的 frequency
(百分比)
输出应如下所示:
group value perc
0 A C 0.50
1 A H 0.25
2 A S 0.25
3 B C 0.75
4 B S 0.25
pandas 等价物是:
so.groupby('group')['value'].value_counts(normalize=True).to_frame('perc').reset_index()
但是如何在 pyspark
中执行相同的操作?
您可以通过在 group
级别上生成一个 count_agg
来做到这一点,它可以进一步与输入 DataFrame
结合并最终聚合以获得所需的规范化 value_counts
如下 -
数据准备
df = pd.DataFrame({'id': ['a','a','a','a','b','b','b','b','c','c','c','c'],
'time': [1,2,3,4,1,2,3,4,1,2,3,4],
'group':['A','A','A','A','A','A','A','A','B','B','B','B'],
'value':['S','C','C','C', 'S','C','H', 'H', 'S','C','C','C']})
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+---+----+-----+-----+
| id|time|group|value|
+---+----+-----+-----+
| a| 1| A| S|
| a| 2| A| C|
| a| 3| A| C|
| a| 4| A| C|
| b| 1| A| S|
| b| 2| A| C|
| b| 3| A| H|
| b| 4| A| H|
| c| 1| B| S|
| c| 2| B| C|
| c| 3| B| C|
| c| 4| B| C|
+---+----+-----+-----+
组计数聚合
count_agg = sparkDF.groupBy('group').agg(F.count(F.col('group')).alias('group_count'))
count_agg.show()
+-----+-----------+
|group|group_count|
+-----+-----------+
| B| 4|
| A| 8|
+-----+-----------+
加入和聚合 - 值计数
sparkDF_agg = sparkDF.join(count_agg
,sparkDF['group'] == count_agg['group']
,'inner'
).select(sparkDF['*'],count_agg['group_count'])\
.groupBy(['group','value','group_count']).agg(F.count(F.col('value')).alias('count'))\
.withColumn('perc',F.col('count')/F.col('group_count'))\
.orderBy(['group','value'])
sparkDF_agg.show()
+-----+-----+-----------+-----+----+
|group|value|group_count|count|perc|
+-----+-----+-----------+-----+----+
| A| C| 8| 4| 0.5|
| A| H| 8| 2|0.25|
| A| S| 8| 2|0.25|
| B| C| 4| 3|0.75|
| B| S| 4| 1|0.25|
+-----+-----+-----------+-----+----+
s1 = (
#get count of each value
so.groupby('group','value').count()
#create arrays of count, values and the sum of each count in a group
.groupby('group').agg(collect_list('count').alias('perc'),collect_list('value').alias('value'),sum('count').alias('sum'))
#Using higher order function compute the percentage count
.withColumn('perc', expr("transform(perc,(x,i)->x/sum)")).drop('sum')
#zip arrays and explode.
.selectExpr("inline(arrays_zip(array_repeat(group,size(value)), value,perc))")
)
s1.show(truncate=False)
+---+-----+----+
| 0|value|perc|
+---+-----+----+
| B| S|0.25|
| B| C|0.75|
| A| S|0.25|
| A| C| 0.5|
| A| H|0.25|
+---+-----+----+
我有一个 pyspark 数据框,如下所示:
import pandas as pd
so = pd.DataFrame({'id': ['a','a','a','a','b','b','b','b','c','c','c','c'],
'time': [1,2,3,4,1,2,3,4,1,2,3,4],
'group':['A','A','A','A','A','A','A','A','B','B','B','B'],
'value':['S','C','C','C', 'S','C','H', 'H', 'S','C','C','C']})
我想计算每个 group
value
的 frequency
(百分比)
输出应如下所示:
group value perc
0 A C 0.50
1 A H 0.25
2 A S 0.25
3 B C 0.75
4 B S 0.25
pandas 等价物是:
so.groupby('group')['value'].value_counts(normalize=True).to_frame('perc').reset_index()
但是如何在 pyspark
中执行相同的操作?
您可以通过在 group
级别上生成一个 count_agg
来做到这一点,它可以进一步与输入 DataFrame
结合并最终聚合以获得所需的规范化 value_counts
如下 -
数据准备
df = pd.DataFrame({'id': ['a','a','a','a','b','b','b','b','c','c','c','c'],
'time': [1,2,3,4,1,2,3,4,1,2,3,4],
'group':['A','A','A','A','A','A','A','A','B','B','B','B'],
'value':['S','C','C','C', 'S','C','H', 'H', 'S','C','C','C']})
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+---+----+-----+-----+
| id|time|group|value|
+---+----+-----+-----+
| a| 1| A| S|
| a| 2| A| C|
| a| 3| A| C|
| a| 4| A| C|
| b| 1| A| S|
| b| 2| A| C|
| b| 3| A| H|
| b| 4| A| H|
| c| 1| B| S|
| c| 2| B| C|
| c| 3| B| C|
| c| 4| B| C|
+---+----+-----+-----+
组计数聚合
count_agg = sparkDF.groupBy('group').agg(F.count(F.col('group')).alias('group_count'))
count_agg.show()
+-----+-----------+
|group|group_count|
+-----+-----------+
| B| 4|
| A| 8|
+-----+-----------+
加入和聚合 - 值计数
sparkDF_agg = sparkDF.join(count_agg
,sparkDF['group'] == count_agg['group']
,'inner'
).select(sparkDF['*'],count_agg['group_count'])\
.groupBy(['group','value','group_count']).agg(F.count(F.col('value')).alias('count'))\
.withColumn('perc',F.col('count')/F.col('group_count'))\
.orderBy(['group','value'])
sparkDF_agg.show()
+-----+-----+-----------+-----+----+
|group|value|group_count|count|perc|
+-----+-----+-----------+-----+----+
| A| C| 8| 4| 0.5|
| A| H| 8| 2|0.25|
| A| S| 8| 2|0.25|
| B| C| 4| 3|0.75|
| B| S| 4| 1|0.25|
+-----+-----+-----------+-----+----+
s1 = (
#get count of each value
so.groupby('group','value').count()
#create arrays of count, values and the sum of each count in a group
.groupby('group').agg(collect_list('count').alias('perc'),collect_list('value').alias('value'),sum('count').alias('sum'))
#Using higher order function compute the percentage count
.withColumn('perc', expr("transform(perc,(x,i)->x/sum)")).drop('sum')
#zip arrays and explode.
.selectExpr("inline(arrays_zip(array_repeat(group,size(value)), value,perc))")
)
s1.show(truncate=False)
+---+-----+----+
| 0|value|perc|
+---+-----+----+
| B| S|0.25|
| B| C|0.75|
| A| S|0.25|
| A| C| 0.5|
| A| H|0.25|
+---+-----+----+