如何在 pyspark 中使用 group by 规范化进行值计数

How to do value counts with normalize in pyspark with group by

我有一个 pyspark 数据框,如下所示:

import pandas as pd
so = pd.DataFrame({'id': ['a','a','a','a','b','b','b','b','c','c','c','c'],
                   'time': [1,2,3,4,1,2,3,4,1,2,3,4],
                   'group':['A','A','A','A','A','A','A','A','B','B','B','B'],
                   'value':['S','C','C','C', 'S','C','H', 'H', 'S','C','C','C']})

我想计算每个 group

valuefrequency(百分比)

输出应如下所示:

group   value   perc
0   A   C   0.50
1   A   H   0.25
2   A   S   0.25
3   B   C   0.75
4   B   S   0.25

pandas 等价物是:

so.groupby('group')['value'].value_counts(normalize=True).to_frame('perc').reset_index()

但是如何在 pyspark 中执行相同的操作?

您可以通过在 group 级别上生成一个 count_agg 来做到这一点,它可以进一步与输入 DataFrame 结合并最终聚合以获得所需的规范化 value_counts 如下 -

数据准备

df = pd.DataFrame({'id': ['a','a','a','a','b','b','b','b','c','c','c','c'],
                   'time': [1,2,3,4,1,2,3,4,1,2,3,4],
                   'group':['A','A','A','A','A','A','A','A','B','B','B','B'],
                   'value':['S','C','C','C', 'S','C','H', 'H', 'S','C','C','C']})

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+---+----+-----+-----+
| id|time|group|value|
+---+----+-----+-----+
|  a|   1|    A|    S|
|  a|   2|    A|    C|
|  a|   3|    A|    C|
|  a|   4|    A|    C|
|  b|   1|    A|    S|
|  b|   2|    A|    C|
|  b|   3|    A|    H|
|  b|   4|    A|    H|
|  c|   1|    B|    S|
|  c|   2|    B|    C|
|  c|   3|    B|    C|
|  c|   4|    B|    C|
+---+----+-----+-----+

组计数聚合

count_agg = sparkDF.groupBy('group').agg(F.count(F.col('group')).alias('group_count'))

count_agg.show()

+-----+-----------+
|group|group_count|
+-----+-----------+
|    B|          4|
|    A|          8|
+-----+-----------+

加入和聚合 - 值计数


sparkDF_agg = sparkDF.join(count_agg
                          ,sparkDF['group'] == count_agg['group']
                          ,'inner'
                    ).select(sparkDF['*'],count_agg['group_count'])\
                    .groupBy(['group','value','group_count']).agg(F.count(F.col('value')).alias('count'))\
                    .withColumn('perc',F.col('count')/F.col('group_count'))\
                    .orderBy(['group','value'])




sparkDF_agg.show()


+-----+-----+-----------+-----+----+
|group|value|group_count|count|perc|
+-----+-----+-----------+-----+----+
|    A|    C|          8|    4| 0.5|
|    A|    H|          8|    2|0.25|
|    A|    S|          8|    2|0.25|
|    B|    C|          4|    3|0.75|
|    B|    S|          4|    1|0.25|
+-----+-----+-----------+-----+----+
s1 = (
  #get count of each value
  so.groupby('group','value').count()
  
  #create arrays of count, values and the sum of each count in a group
  .groupby('group').agg(collect_list('count').alias('perc'),collect_list('value').alias('value'),sum('count').alias('sum'))
   #Using higher order function compute the percentage count
  .withColumn('perc', expr("transform(perc,(x,i)->x/sum)")).drop('sum')
  #zip arrays and explode.
  .selectExpr("inline(arrays_zip(array_repeat(group,size(value)), value,perc))")
)

s1.show(truncate=False)


+---+-----+----+
|  0|value|perc|
+---+-----+----+
|  B|    S|0.25|
|  B|    C|0.75|
|  A|    S|0.25|
|  A|    C| 0.5|
|  A|    H|0.25|
+---+-----+----+