PySpark:给定不同聚合级别和条件的列的操作

PySpark: Operations with columns given different levels of aggregation and conditions

我想得到一个情绪比率,为此我需要计算每个主题有多少正面和多少负面,然后除以每个主题的记录总数。

假设我有这个数据集:

+-----+---------+
|topic|sentiment|
+-----+---------+
|Chair| positive|
|Table| negative|
|Chair| negative|
|Chair| negative|
|Table| positive|
|Table| positive|
|Table| positive|
+-----+---------+

在这种情况下,我可以将 -1 的值赋予 'negative',将 1 赋予 'positive',那么在 [=31= 的情况下,这个比率将是 0.5 ] (negative + positive + positive + positive) / total_count)-0.33 在主席的情况下:(positive + negative + negative) / total_count).

我想出了这个解决方案,但似乎太复杂了:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, when


spark = SparkSession.builder.appName('SparkExample').getOrCreate()

data_e = [("Chair","positive"),
    ("Table","negative"),
    ("Chair","negative"),
    ("Chair","negative"),
    ("Table","positive"),
    ("Table","positive")
  ]

schema_e = StructType([ \
    StructField("topic",StringType(),True), \
    StructField("sentiment",StringType(),True), \
  ])
 
df_e = spark.createDataFrame(data=data_e,schema=schema_e)

df_e_int = df_e.withColumn('sentiment_int', 
                           when(col('sentiment') == 'positive', 1) \
               .otherwise(-1)) \
               .select('topic', 'sentiment_int')

agg_e = df_e_int.groupBy('topic') \
                .count() \
                .select('topic', 
                        col('count').alias('counts'))

agg_sum_e = df_e_int.groupBy('topic') \
                    .sum('sentiment_int') \
                    .select('topic', 
                            col('sum(sentiment_int)').alias('sum_value'))

agg_joined_e = agg_e.join(agg_sum_e, 
                          agg_e.topic == agg_sum_e.topic, 
                          'inner') \
                    .select(agg_e.topic, 'counts', 'sum_value')

final_agg_e = agg_joined_e.withColumn('sentiment_ratio', 
                                      (col('sum_value')/col('counts'))) \
                          .select('topic', 'sentiment_ratio')

最终输出如下所示:

+-----+-------------------+
|topic|    sentiment_ratio|
+-----+-------------------+
|Chair|-0.3333333333333333|
|Table| 0.5               |
+-----+-------------------+

最有效的方法是什么?

您可以使用 avg:

将您的逻辑压缩成两行
from pyspark.sql import functions as F

df_e.groupBy("topic") \
    .agg(F.avg(F.when(F.col("sentiment").eqNullSafe("positive"), 1).otherwise(-1))) \
    .show()