PySpark:给定不同聚合级别和条件的列的操作
PySpark: Operations with columns given different levels of aggregation and conditions
我想得到一个情绪比率,为此我需要计算每个主题有多少正面和多少负面,然后除以每个主题的记录总数。
假设我有这个数据集:
+-----+---------+
|topic|sentiment|
+-----+---------+
|Chair| positive|
|Table| negative|
|Chair| negative|
|Chair| negative|
|Table| positive|
|Table| positive|
|Table| positive|
+-----+---------+
在这种情况下,我可以将 -1 的值赋予 'negative',将 1 赋予 'positive',那么在 [=31= 的情况下,这个比率将是 0.5
] (negative + positive + positive + positive) / total_count)
,-0.33
在主席的情况下:(positive + negative + negative) / total_count)
.
我想出了这个解决方案,但似乎太复杂了:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, when
spark = SparkSession.builder.appName('SparkExample').getOrCreate()
data_e = [("Chair","positive"),
("Table","negative"),
("Chair","negative"),
("Chair","negative"),
("Table","positive"),
("Table","positive")
]
schema_e = StructType([ \
StructField("topic",StringType(),True), \
StructField("sentiment",StringType(),True), \
])
df_e = spark.createDataFrame(data=data_e,schema=schema_e)
df_e_int = df_e.withColumn('sentiment_int',
when(col('sentiment') == 'positive', 1) \
.otherwise(-1)) \
.select('topic', 'sentiment_int')
agg_e = df_e_int.groupBy('topic') \
.count() \
.select('topic',
col('count').alias('counts'))
agg_sum_e = df_e_int.groupBy('topic') \
.sum('sentiment_int') \
.select('topic',
col('sum(sentiment_int)').alias('sum_value'))
agg_joined_e = agg_e.join(agg_sum_e,
agg_e.topic == agg_sum_e.topic,
'inner') \
.select(agg_e.topic, 'counts', 'sum_value')
final_agg_e = agg_joined_e.withColumn('sentiment_ratio',
(col('sum_value')/col('counts'))) \
.select('topic', 'sentiment_ratio')
最终输出如下所示:
+-----+-------------------+
|topic| sentiment_ratio|
+-----+-------------------+
|Chair|-0.3333333333333333|
|Table| 0.5 |
+-----+-------------------+
最有效的方法是什么?
您可以使用 avg:
将您的逻辑压缩成两行
from pyspark.sql import functions as F
df_e.groupBy("topic") \
.agg(F.avg(F.when(F.col("sentiment").eqNullSafe("positive"), 1).otherwise(-1))) \
.show()
我想得到一个情绪比率,为此我需要计算每个主题有多少正面和多少负面,然后除以每个主题的记录总数。
假设我有这个数据集:
+-----+---------+
|topic|sentiment|
+-----+---------+
|Chair| positive|
|Table| negative|
|Chair| negative|
|Chair| negative|
|Table| positive|
|Table| positive|
|Table| positive|
+-----+---------+
在这种情况下,我可以将 -1 的值赋予 'negative',将 1 赋予 'positive',那么在 [=31= 的情况下,这个比率将是 0.5
] (negative + positive + positive + positive) / total_count)
,-0.33
在主席的情况下:(positive + negative + negative) / total_count)
.
我想出了这个解决方案,但似乎太复杂了:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, when
spark = SparkSession.builder.appName('SparkExample').getOrCreate()
data_e = [("Chair","positive"),
("Table","negative"),
("Chair","negative"),
("Chair","negative"),
("Table","positive"),
("Table","positive")
]
schema_e = StructType([ \
StructField("topic",StringType(),True), \
StructField("sentiment",StringType(),True), \
])
df_e = spark.createDataFrame(data=data_e,schema=schema_e)
df_e_int = df_e.withColumn('sentiment_int',
when(col('sentiment') == 'positive', 1) \
.otherwise(-1)) \
.select('topic', 'sentiment_int')
agg_e = df_e_int.groupBy('topic') \
.count() \
.select('topic',
col('count').alias('counts'))
agg_sum_e = df_e_int.groupBy('topic') \
.sum('sentiment_int') \
.select('topic',
col('sum(sentiment_int)').alias('sum_value'))
agg_joined_e = agg_e.join(agg_sum_e,
agg_e.topic == agg_sum_e.topic,
'inner') \
.select(agg_e.topic, 'counts', 'sum_value')
final_agg_e = agg_joined_e.withColumn('sentiment_ratio',
(col('sum_value')/col('counts'))) \
.select('topic', 'sentiment_ratio')
最终输出如下所示:
+-----+-------------------+
|topic| sentiment_ratio|
+-----+-------------------+
|Chair|-0.3333333333333333|
|Table| 0.5 |
+-----+-------------------+
最有效的方法是什么?
您可以使用 avg:
将您的逻辑压缩成两行from pyspark.sql import functions as F
df_e.groupBy("topic") \
.agg(F.avg(F.when(F.col("sentiment").eqNullSafe("positive"), 1).otherwise(-1))) \
.show()