反组 by/R 适用于 Pyspark

Anti group by/R apply in Pyspark

我是 R 程序员,正在进入 pyspark 世界并且掌握了很多基本技巧,但我仍在苦苦挣扎的是我会做的事情 applys 或 basic for loops。

在这种情况下,我正在尝试计算 ID 的 "anti-groupby"。基本上,我们的想法是查看该 ID 的总体,然后查看非该 ID 的总体,并将这两个值放在同一行。使用 groupby 获取该 ID 的人口很容易,然后将其连接到以 new_id 作为唯一列的数据集。

这就是我在 R 中的做法:

anti_group <- function(id){
    tr <- sum(subset(df1, new_id!=id)$total_1)
    to <- sum(subset(df1, new_id!=id)$total_2)
    54 * tr / to
  }
  test$other.RP54 <- sapply(test$new_id, anti_group  )

我将如何在 pyspark 中执行此操作?

谢谢!

编辑:

#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
|  2|   10|
|  2|   90|
|  3|   20|
|  3|   10|
|  4|    2|
|  4|    5|
+---+-----+

然后是一些创建最终数据框的函数,如下所示:

+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  1|           70|               137|
|  2|          100|               107|
|  3|           30|               177|
|  4|            7|               200|
+---+-------------+------------------+

因此没有可以复制该 groupBy 函数的内置函数,但您可以通过使用 case(when/otherwise clause) 创建一个新列来轻松实现 case(when/otherwise clause) =28=]组和反组,然后groupBy new column

#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
|  2|   10|
|  2|   90|
|  3|   20|
|  3|   10|
|  4|    2|
|  4|    5|
+---+-----+

from pyspark.sql import functions as F
df.withColumn("anti_id_1", F.when(F.col("id")==1, F.lit('1')).otherwise(F.lit('Not_1')))\
  .groupBy("anti_id_1").agg(F.sum("value").alias("sum")).show()

+---------+---+
|anti_id_1|sum|
+---------+---+
|        1| 70|
|    Not_1|137|
+---------+---+

UPDATE:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

w1=Window().partitionBy("id")
w=Window().partitionBy()
df.withColumn("grouped_total",F.sum("value").over(w1))\
  .withColumn("anti_grouped_total", (F.sum("value").over(w))-F.col("grouped_total"))\
  .groupBy("id").agg(F.first("grouped_total").alias("grouped_total"),\
                     F.first("anti_grouped_total").alias("anti_grouped_total"))\
  .drop("value").orderBy("id").show()


+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  1|           70|               137|
|  2|          100|               107|
|  3|           30|               177|
|  4|            7|               200|
+---+-------------+------------------+

Less verbose/concise实现相同输出的方式:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value").alias("grouped_total"))\
          .withColumn("anti_grouped_total",F.sum("grouped_total").over(w)-F.col("grouped_total")).orderBy("id"),show()

For 2 value columns:

df.show()
+---+------+------+
| id|value1|value2|
+---+------+------+
|  1|    40|    50|
|  1|    30|    70|
|  2|    10|    91|
|  2|    90|    21|
|  3|    20|    42|
|  3|    10|     4|
|  4|     2|    23|
|  4|     5|    12|
+---+------+------+

from pyspark.sql.window import Window
from pyspark.sql import functions as F

w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value1").alias("grouped_total_1"),F.sum("value2").alias("grouped_total_2"))\
          .withColumn("anti_grouped_total_1",F.sum("grouped_total_1").over(w)-F.col("grouped_total_1"))\
          .withColumn("anti_grouped_total_2",F.sum("grouped_total_2").over(w)-F.col("grouped_total_2")).orderBy("id").show()

+---+---------------+---------------+--------------------+--------------------+
| id|grouped_total_1|grouped_total_2|anti_grouped_total_1|anti_grouped_total_2|
+---+---------------+---------------+--------------------+--------------------+
|  1|             70|            120|                 137|                 193|
|  2|            100|            112|                 107|                 201|
|  3|             30|             46|                 177|                 267|
|  4|              7|             35|                 200|                 278|
+---+---------------+---------------+--------------------+--------------------+

我认为您可以分两步完成:首先按 ID 求和,然后取总和并减去此 ID 的值。

我的想法有点像group_by(id) %>% summarise(x = sum(x)) %>% mutate(y = sum(x) - x)中的dplyr

我提出的解决方案是基于Window函数。未经测试:

让我们先创建数据

import pyspark.sql.functions as psf
import pyspark.sql.window as psw

df = spark.createDataFrame([(1,40),(1,30),(2,10),(2,90),(3,20),(3,10),(4,2),(4,5)], ['id','value'])

df.show(2)

+---+-----+
| id|value|
+---+-----+
|  1|   40|
|  1|   30|
+---+-----+
only showing top 2 rows

然后应用该方法:

w = psw.Window.orderBy()
df_id = df.groupBy("id").agg(psf.sum("value").alias("grouped_total"))
df_id = (df_id
          .withColumn("anti_grouped_total",psf.sum("grouped_total").over(w))
          .withColumn('anti_grouped_total', psf.col('anti_grouped_total') - psf.col('grouped_total'))
        )

df_id.show(2)
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
|  3|           30|               177|
|  1|           70|               137|
+---+-------------+------------------+
only showing top 2 rows