Pyspark 将列除以按另一列分组的小计
Pyspark divide column by its subtotals grouped by another column
我的问题类似于this and 。这两篇文章都展示了如何将列值除以同一列的总和。在我的例子中,我想将列的值除以小计的总和。小计是通过根据另一列对列值进行分组来计算的。我正在稍微修改上面共享的链接中提到的示例。
这是我的数据框
df = [[1,'CAT1',10], [2, 'CAT1', 11], [3, 'CAT2', 20], [4, 'CAT2', 22], [5, 'CAT3', 30]]
df = spark.createDataFrame(df, ['id', 'category', 'consumption'])
df.show()
+---+--------+-----------+
| id|category|consumption|
+---+--------+-----------+
| 1| CAT1| 10|
| 2| CAT1| 11|
| 3| CAT2| 20|
| 4| CAT2| 22|
| 5| CAT3| 30|
+---+--------+-----------+
我想将 "consumption" 值除以分组 "category" 的总和,并将该值放在列 "normalized" 中,如下所示。
小计不需要在输出中(列消耗中的数字21、42和30)
到目前为止我取得的成就
df.crossJoin(
df.groupby('category').agg(F.sum('consumption').alias('sum_'))
).withColumn("normalized", F.col("consumption")/F.col("sum_"))\
.show()
+---+--------+-----------+--------+----+-------------------+
| id|category|consumption|category|sum_| normalized|
+---+--------+-----------+--------+----+-------------------+
| 1| CAT1| 10| CAT2| 42|0.23809523809523808|
| 2| CAT1| 11| CAT2| 42| 0.2619047619047619|
| 1| CAT1| 10| CAT1| 21|0.47619047619047616|
| 2| CAT1| 11| CAT1| 21| 0.5238095238095238|
| 1| CAT1| 10| CAT3| 30| 0.3333333333333333|
| 2| CAT1| 11| CAT3| 30|0.36666666666666664|
| 3| CAT2| 20| CAT2| 42|0.47619047619047616|
| 4| CAT2| 22| CAT2| 42| 0.5238095238095238|
| 5| CAT3| 30| CAT2| 42| 0.7142857142857143|
| 3| CAT2| 20| CAT1| 21| 0.9523809523809523|
| 4| CAT2| 22| CAT1| 21| 1.0476190476190477|
| 5| CAT3| 30| CAT1| 21| 1.4285714285714286|
| 3| CAT2| 20| CAT3| 30| 0.6666666666666666|
| 4| CAT2| 22| CAT3| 30| 0.7333333333333333|
| 5| CAT3| 30| CAT3| 30| 1.0|
+---+--------+-----------+--------+----+-------------------+
您可以按照您已经提到的链接进行基本相同的操作。唯一的区别是你必须在使用 groupby and sum:
之前计算小计
import pyspark.sql.functions as F
df = df.join(df.groupby('category').sum('consumption'), 'category')
df = df.select('id', 'category', F.round(F.col('consumption')/F.col('sum(consumption)'), 2).alias('normalized'))
df.show()
输出:
+---+--------+----------+
| id|category|normalized|
+---+--------+----------+
| 3| CAT2| 0.48|
| 4| CAT2| 0.52|
| 1| CAT1| 0.48|
| 2| CAT1| 0.52|
| 5| CAT3| 1.0|
+---+--------+----------+
这是 OP
提出的另一种解决问题的方法,但没有使用 joins()
。
joins()
通常是代价高昂的操作,应尽可能避免。
# We first register our DataFrame as temporary SQL view
df.registerTempTable('table_view')
df = sqlContext.sql("""select id, category,
consumption/sum(consumption) over (partition by category) as normalize
from table_view""")
df.show()
+---+--------+-------------------+
| id|category| normalize|
+---+--------+-------------------+
| 3| CAT2|0.47619047619047616|
| 4| CAT2| 0.5238095238095238|
| 1| CAT1|0.47619047619047616|
| 2| CAT1| 0.5238095238095238|
| 5| CAT3| 1.0|
+---+--------+-------------------+
注意: """ 为了可见性和整洁性,已被用于多行语句。使用简单的 'select id ....'
如果您尝试将您的语句分布在多行上,那将不起作用。不用说,最后的结果也是一样的。
我的问题类似于this and
这是我的数据框
df = [[1,'CAT1',10], [2, 'CAT1', 11], [3, 'CAT2', 20], [4, 'CAT2', 22], [5, 'CAT3', 30]]
df = spark.createDataFrame(df, ['id', 'category', 'consumption'])
df.show()
+---+--------+-----------+
| id|category|consumption|
+---+--------+-----------+
| 1| CAT1| 10|
| 2| CAT1| 11|
| 3| CAT2| 20|
| 4| CAT2| 22|
| 5| CAT3| 30|
+---+--------+-----------+
我想将 "consumption" 值除以分组 "category" 的总和,并将该值放在列 "normalized" 中,如下所示。
小计不需要在输出中(列消耗中的数字21、42和30)
到目前为止我取得的成就 df.crossJoin(
df.groupby('category').agg(F.sum('consumption').alias('sum_'))
).withColumn("normalized", F.col("consumption")/F.col("sum_"))\
.show()
+---+--------+-----------+--------+----+-------------------+
| id|category|consumption|category|sum_| normalized|
+---+--------+-----------+--------+----+-------------------+
| 1| CAT1| 10| CAT2| 42|0.23809523809523808|
| 2| CAT1| 11| CAT2| 42| 0.2619047619047619|
| 1| CAT1| 10| CAT1| 21|0.47619047619047616|
| 2| CAT1| 11| CAT1| 21| 0.5238095238095238|
| 1| CAT1| 10| CAT3| 30| 0.3333333333333333|
| 2| CAT1| 11| CAT3| 30|0.36666666666666664|
| 3| CAT2| 20| CAT2| 42|0.47619047619047616|
| 4| CAT2| 22| CAT2| 42| 0.5238095238095238|
| 5| CAT3| 30| CAT2| 42| 0.7142857142857143|
| 3| CAT2| 20| CAT1| 21| 0.9523809523809523|
| 4| CAT2| 22| CAT1| 21| 1.0476190476190477|
| 5| CAT3| 30| CAT1| 21| 1.4285714285714286|
| 3| CAT2| 20| CAT3| 30| 0.6666666666666666|
| 4| CAT2| 22| CAT3| 30| 0.7333333333333333|
| 5| CAT3| 30| CAT3| 30| 1.0|
+---+--------+-----------+--------+----+-------------------+
您可以按照您已经提到的链接进行基本相同的操作。唯一的区别是你必须在使用 groupby and sum:
之前计算小计import pyspark.sql.functions as F
df = df.join(df.groupby('category').sum('consumption'), 'category')
df = df.select('id', 'category', F.round(F.col('consumption')/F.col('sum(consumption)'), 2).alias('normalized'))
df.show()
输出:
+---+--------+----------+
| id|category|normalized|
+---+--------+----------+
| 3| CAT2| 0.48|
| 4| CAT2| 0.52|
| 1| CAT1| 0.48|
| 2| CAT1| 0.52|
| 5| CAT3| 1.0|
+---+--------+----------+
这是 OP
提出的另一种解决问题的方法,但没有使用 joins()
。
joins()
通常是代价高昂的操作,应尽可能避免。
# We first register our DataFrame as temporary SQL view
df.registerTempTable('table_view')
df = sqlContext.sql("""select id, category,
consumption/sum(consumption) over (partition by category) as normalize
from table_view""")
df.show()
+---+--------+-------------------+
| id|category| normalize|
+---+--------+-------------------+
| 3| CAT2|0.47619047619047616|
| 4| CAT2| 0.5238095238095238|
| 1| CAT1|0.47619047619047616|
| 2| CAT1| 0.5238095238095238|
| 5| CAT3| 1.0|
+---+--------+-------------------+
注意: """ 为了可见性和整洁性,已被用于多行语句。使用简单的 'select id ....'
如果您尝试将您的语句分布在多行上,那将不起作用。不用说,最后的结果也是一样的。