在 Spark 中获取组的最后一个值

Getting last value of group in Spark

我有一个 SparkR DataFrame,如下所示:

#Create R data.frame
custId <- c(rep(1001, 5), rep(1002, 3), 1003)
date <- c('2013-08-01','2014-01-01','2014-02-01','2014-03-01','2014-04-01','2014-02-01','2014-03-01','2014-04-01','2014-04-01')
desc <- c('New','New','Good','New', 'Bad','New','Good','Good','New')
newcust <- c(1,1,0,1,0,1,0,0,1)
df <- data.frame(custId, date, desc, newcust)

#Create SparkR DataFrame    
df <- createDataFrame(df)
display(df)
      custId|    date   | desc | newcust
      --------------------------------------
       1001 | 2013-08-01| New  |   1
       1001 | 2014-01-01| New  |   1
       1001 | 2014-02-01| Good |   0
       1001 | 2014-03-01| New  |   1
       1001 | 2014-04-01| Bad  |   0
       1002 | 2014-02-01| New  |   1
       1002 | 2014-03-01| Good |   0
       1002 | 2014-04-01| Good |   0 
       1003 | 2014-04-01| New  |   1

newcust 表示每次出现新的 custId 时都会有新客户,或者如果相同的 custIddesc 恢复为 'New'。我想要获得的是每个 newcust 分组的最后一个 desc 值,同时保持每个分组的第一个 date 值。下面是我想要获取的 DataFrame。我如何在 Spark 中执行此操作? PySpark 或 SparkR 代码都可以工作。

#What I want 
custId|    date   | newcust | finaldesc
----------------------------------------------
 1001 | 2013-08-01|   1     | New
 1001 | 2014-01-01|   1     | Good
 1001 | 2014-03-01|   1     | Bad
 1002 | 2014-02-01|   1     | Good
 1003 | 2014-04-01|   1     | New

我不知道 sparkR,所以我会在 pyspark 中回答。 您可以使用 window 函数实现此目的。

首先,让我们定义 "groupings of newcust",您希望 newcust 等于 1 的每一行作为新组的开始,计算累加和即可:

from pyspark.sql import Window
import pyspark.sql.functions as psf

w1 = Window.partitionBy("custId").orderBy("date")
df1 = df.withColumn("subgroup", psf.sum("newcust").over(w1))

+------+----------+----+-------+--------+
|custId|      date|desc|newcust|subgroup|
+------+----------+----+-------+--------+
|  1001|2013-08-01| New|      1|       1|
|  1001|2014-01-01| New|      1|       2|
|  1001|2014-02-01|Good|      0|       2|
|  1001|2014-03-01| New|      1|       3|
|  1001|2014-04-01| Bad|      0|       3|
|  1002|2014-02-01| New|      1|       1|
|  1002|2014-03-01|Good|      0|       1|
|  1002|2014-04-01|Good|      0|       1|
|  1003|2014-04-01| New|      1|       1|
+------+----------+----+-------+--------+

对于每个 subgroup,我们希望保留第一个日期:

w2 = Window.partitionBy("custId", "subgroup")
df2 = df1.withColumn("first_date", psf.min("date").over(w2))

+------+----------+----+-------+--------+----------+
|custId|      date|desc|newcust|subgroup|first_date|
+------+----------+----+-------+--------+----------+
|  1001|2013-08-01| New|      1|       1|2013-08-01|
|  1001|2014-01-01| New|      1|       2|2014-01-01|
|  1001|2014-02-01|Good|      0|       2|2014-01-01|
|  1001|2014-03-01| New|      1|       3|2014-03-01|
|  1001|2014-04-01| Bad|      0|       3|2014-03-01|
|  1002|2014-02-01| New|      1|       1|2014-02-01|
|  1002|2014-03-01|Good|      0|       1|2014-02-01|
|  1002|2014-04-01|Good|      0|       1|2014-02-01|
|  1003|2014-04-01| New|      1|       1|2014-04-01|
+------+----------+----+-------+--------+----------+

最后,我们要保留每个 subgroup:

的最后一行(按日期排序)
w3 = Window.partitionBy("custId", "subgroup").orderBy(psf.desc("date"))
df3 = df2.withColumn(
    "rn", 
    psf.row_number().over(w3)
).filter("rn = 1").select(
    "custId", 
    psf.col("first_date").alias("date"), 
    "desc"
)

+------+----------+----+
|custId|      date|desc|
+------+----------+----+
|  1001|2013-08-01| New|
|  1001|2014-01-01|Good|
|  1001|2014-03-01| Bad|
|  1002|2014-02-01|Good|
|  1003|2014-04-01| New|
+------+----------+----+

这是@MaFF 在 SparkR 中的代码:

w1 <- orderBy(windowPartitionBy('custId'), df$date)
df1 <- withColumn(df, "subgroup", over(sum(df$newcust), w1))

w2 <- windowPartitionBy("custId", "subgroup")
df2 <- withColumn(df1, "first_date", over(min(df1$date), w2))

w3 <- orderBy(windowPartitionBy("custId", "subgroup"), desc(df$date))
df3 <- withColumn(df2, "rn", over(row_number(), w3))
df3 <- select(filter(df3, df3$rn == 1), "custId", "first_date", "desc")
df3 <- withColumnRenamed(df3, 'first_date', "date")

df3 <- arrange(df3, 'custId', 'date')
display(df3)
+------+----------+----+
|custId|      date|desc|
+------+----------+----+
|  1001|2013-08-01| New|
|  1001|2014-01-01|Good|
|  1001|2014-03-01| Bad|
|  1002|2014-02-01|Good|
|  1003|2014-04-01| New|
+------+----------+----+