反组 by/R 适用于 Pyspark
Anti group by/R apply in Pyspark
我是 R 程序员,正在进入 pyspark 世界并且掌握了很多基本技巧,但我仍在苦苦挣扎的是我会做的事情 applys 或 basic for loops。
在这种情况下,我正在尝试计算 ID 的 "anti-groupby"。基本上,我们的想法是查看该 ID 的总体,然后查看非该 ID 的总体,并将这两个值放在同一行。使用 groupby 获取该 ID 的人口很容易,然后将其连接到以 new_id 作为唯一列的数据集。
这就是我在 R 中的做法:
anti_group <- function(id){
tr <- sum(subset(df1, new_id!=id)$total_1)
to <- sum(subset(df1, new_id!=id)$total_2)
54 * tr / to
}
test$other.RP54 <- sapply(test$new_id, anti_group )
我将如何在 pyspark 中执行此操作?
谢谢!
编辑:
#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
| 2| 10|
| 2| 90|
| 3| 20|
| 3| 10|
| 4| 2|
| 4| 5|
+---+-----+
然后是一些创建最终数据框的函数,如下所示:
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 1| 70| 137|
| 2| 100| 107|
| 3| 30| 177|
| 4| 7| 200|
+---+-------------+------------------+
因此没有可以复制该 groupBy 函数的内置函数,但您可以通过使用 case(when/otherwise clause)
创建一个新列来轻松实现 case(when/otherwise clause)
=28=]组和反组,然后groupBy
new column
。
#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
| 2| 10|
| 2| 90|
| 3| 20|
| 3| 10|
| 4| 2|
| 4| 5|
+---+-----+
from pyspark.sql import functions as F
df.withColumn("anti_id_1", F.when(F.col("id")==1, F.lit('1')).otherwise(F.lit('Not_1')))\
.groupBy("anti_id_1").agg(F.sum("value").alias("sum")).show()
+---------+---+
|anti_id_1|sum|
+---------+---+
| 1| 70|
| Not_1|137|
+---------+---+
UPDATE:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w1=Window().partitionBy("id")
w=Window().partitionBy()
df.withColumn("grouped_total",F.sum("value").over(w1))\
.withColumn("anti_grouped_total", (F.sum("value").over(w))-F.col("grouped_total"))\
.groupBy("id").agg(F.first("grouped_total").alias("grouped_total"),\
F.first("anti_grouped_total").alias("anti_grouped_total"))\
.drop("value").orderBy("id").show()
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 1| 70| 137|
| 2| 100| 107|
| 3| 30| 177|
| 4| 7| 200|
+---+-------------+------------------+
Less verbose/concise
实现相同输出的方式:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value").alias("grouped_total"))\
.withColumn("anti_grouped_total",F.sum("grouped_total").over(w)-F.col("grouped_total")).orderBy("id"),show()
For 2 value columns:
df.show()
+---+------+------+
| id|value1|value2|
+---+------+------+
| 1| 40| 50|
| 1| 30| 70|
| 2| 10| 91|
| 2| 90| 21|
| 3| 20| 42|
| 3| 10| 4|
| 4| 2| 23|
| 4| 5| 12|
+---+------+------+
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value1").alias("grouped_total_1"),F.sum("value2").alias("grouped_total_2"))\
.withColumn("anti_grouped_total_1",F.sum("grouped_total_1").over(w)-F.col("grouped_total_1"))\
.withColumn("anti_grouped_total_2",F.sum("grouped_total_2").over(w)-F.col("grouped_total_2")).orderBy("id").show()
+---+---------------+---------------+--------------------+--------------------+
| id|grouped_total_1|grouped_total_2|anti_grouped_total_1|anti_grouped_total_2|
+---+---------------+---------------+--------------------+--------------------+
| 1| 70| 120| 137| 193|
| 2| 100| 112| 107| 201|
| 3| 30| 46| 177| 267|
| 4| 7| 35| 200| 278|
+---+---------------+---------------+--------------------+--------------------+
我认为您可以分两步完成:首先按 ID 求和,然后取总和并减去此 ID 的值。
我的想法有点像group_by(id) %>% summarise(x = sum(x)) %>% mutate(y = sum(x) - x)
中的dplyr
我提出的解决方案是基于Window
函数。未经测试:
让我们先创建数据
import pyspark.sql.functions as psf
import pyspark.sql.window as psw
df = spark.createDataFrame([(1,40),(1,30),(2,10),(2,90),(3,20),(3,10),(4,2),(4,5)], ['id','value'])
df.show(2)
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
+---+-----+
only showing top 2 rows
然后应用该方法:
w = psw.Window.orderBy()
df_id = df.groupBy("id").agg(psf.sum("value").alias("grouped_total"))
df_id = (df_id
.withColumn("anti_grouped_total",psf.sum("grouped_total").over(w))
.withColumn('anti_grouped_total', psf.col('anti_grouped_total') - psf.col('grouped_total'))
)
df_id.show(2)
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 3| 30| 177|
| 1| 70| 137|
+---+-------------+------------------+
only showing top 2 rows
我是 R 程序员,正在进入 pyspark 世界并且掌握了很多基本技巧,但我仍在苦苦挣扎的是我会做的事情 applys 或 basic for loops。
在这种情况下,我正在尝试计算 ID 的 "anti-groupby"。基本上,我们的想法是查看该 ID 的总体,然后查看非该 ID 的总体,并将这两个值放在同一行。使用 groupby 获取该 ID 的人口很容易,然后将其连接到以 new_id 作为唯一列的数据集。
这就是我在 R 中的做法:
anti_group <- function(id){
tr <- sum(subset(df1, new_id!=id)$total_1)
to <- sum(subset(df1, new_id!=id)$total_2)
54 * tr / to
}
test$other.RP54 <- sapply(test$new_id, anti_group )
我将如何在 pyspark 中执行此操作?
谢谢!
编辑:
#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
| 2| 10|
| 2| 90|
| 3| 20|
| 3| 10|
| 4| 2|
| 4| 5|
+---+-----+
然后是一些创建最终数据框的函数,如下所示:
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 1| 70| 137|
| 2| 100| 107|
| 3| 30| 177|
| 4| 7| 200|
+---+-------------+------------------+
因此没有可以复制该 groupBy 函数的内置函数,但您可以通过使用 case(when/otherwise clause)
创建一个新列来轻松实现 case(when/otherwise clause)
=28=]组和反组,然后groupBy
new column
。
#df.show()
#sample data
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
| 2| 10|
| 2| 90|
| 3| 20|
| 3| 10|
| 4| 2|
| 4| 5|
+---+-----+
from pyspark.sql import functions as F
df.withColumn("anti_id_1", F.when(F.col("id")==1, F.lit('1')).otherwise(F.lit('Not_1')))\
.groupBy("anti_id_1").agg(F.sum("value").alias("sum")).show()
+---------+---+
|anti_id_1|sum|
+---------+---+
| 1| 70|
| Not_1|137|
+---------+---+
UPDATE:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w1=Window().partitionBy("id")
w=Window().partitionBy()
df.withColumn("grouped_total",F.sum("value").over(w1))\
.withColumn("anti_grouped_total", (F.sum("value").over(w))-F.col("grouped_total"))\
.groupBy("id").agg(F.first("grouped_total").alias("grouped_total"),\
F.first("anti_grouped_total").alias("anti_grouped_total"))\
.drop("value").orderBy("id").show()
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 1| 70| 137|
| 2| 100| 107|
| 3| 30| 177|
| 4| 7| 200|
+---+-------------+------------------+
Less verbose/concise
实现相同输出的方式:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value").alias("grouped_total"))\
.withColumn("anti_grouped_total",F.sum("grouped_total").over(w)-F.col("grouped_total")).orderBy("id"),show()
For 2 value columns:
df.show()
+---+------+------+
| id|value1|value2|
+---+------+------+
| 1| 40| 50|
| 1| 30| 70|
| 2| 10| 91|
| 2| 90| 21|
| 3| 20| 42|
| 3| 10| 4|
| 4| 2| 23|
| 4| 5| 12|
+---+------+------+
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = Window().partitionBy()
df.groupBy("id").agg(F.sum("value1").alias("grouped_total_1"),F.sum("value2").alias("grouped_total_2"))\
.withColumn("anti_grouped_total_1",F.sum("grouped_total_1").over(w)-F.col("grouped_total_1"))\
.withColumn("anti_grouped_total_2",F.sum("grouped_total_2").over(w)-F.col("grouped_total_2")).orderBy("id").show()
+---+---------------+---------------+--------------------+--------------------+
| id|grouped_total_1|grouped_total_2|anti_grouped_total_1|anti_grouped_total_2|
+---+---------------+---------------+--------------------+--------------------+
| 1| 70| 120| 137| 193|
| 2| 100| 112| 107| 201|
| 3| 30| 46| 177| 267|
| 4| 7| 35| 200| 278|
+---+---------------+---------------+--------------------+--------------------+
我认为您可以分两步完成:首先按 ID 求和,然后取总和并减去此 ID 的值。
我的想法有点像group_by(id) %>% summarise(x = sum(x)) %>% mutate(y = sum(x) - x)
中的dplyr
我提出的解决方案是基于Window
函数。未经测试:
让我们先创建数据
import pyspark.sql.functions as psf
import pyspark.sql.window as psw
df = spark.createDataFrame([(1,40),(1,30),(2,10),(2,90),(3,20),(3,10),(4,2),(4,5)], ['id','value'])
df.show(2)
+---+-----+
| id|value|
+---+-----+
| 1| 40|
| 1| 30|
+---+-----+
only showing top 2 rows
然后应用该方法:
w = psw.Window.orderBy()
df_id = df.groupBy("id").agg(psf.sum("value").alias("grouped_total"))
df_id = (df_id
.withColumn("anti_grouped_total",psf.sum("grouped_total").over(w))
.withColumn('anti_grouped_total', psf.col('anti_grouped_total') - psf.col('grouped_total'))
)
df_id.show(2)
+---+-------------+------------------+
| id|grouped_total|anti_grouped_total|
+---+-------------+------------------+
| 3| 30| 177|
| 1| 70| 137|
+---+-------------+------------------+
only showing top 2 rows