Spark:如何在列上捕获列转换和 groupby
Spark: How to capture column transition and groupby on the column
id
value
prevValue
indicator
1
emp1
null
1
2
emp2
emp1
2
3
emp1
emp2
3
4
emp1
emp1
3
5
emp3
emp1
4
6
emp3
emp3
4
7
emp1
emp3
5
8
emp2
emp1
6
9
emp2
emp2
6
10
emp2
emp2
6
假设这整组行在一个会话中window
我使用滞后函数创建了 prevValue
列
lag("value", 1).over(sessionWindow).as("prevValue")
.
我想创建 table 中显示的指标列,但还没有成功。
当上一行的值与当前行的值不同时,值就会发生转变,每当发生转变时,指示器行就会递增 1,否则与前一行保持相同。
创建指标列的主要原因是稍后对指标进行分组。
这是我试图创建列的方法,但这不起作用。任何解决此问题的帮助将不胜感激。
.withColumn("indicator", when(col("prevValue").isNull, 1).otherwise(0))
.withColumn("indicator",
when(col("value") =!= col("prevValue"), lag("indicator", 1).over(sessionWindow) + 1)
.otherwise(lag("indicator", 1, 1).over(sessionWindow)))
val indWindow = Window.orderBy("id")
val sumWIndow = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("changed", when(col("value") =!= lag(col("value"), 1).over(indWindow), 1).otherwise(0))
.withColumn("group", sum("changed").over(sumWIndow))
产生:
+---+-----+-------+-----+
| id|value|changed|group|
+---+-----+-------+-----+
| 1| a| 0| 0|
| 2| a| 0| 0|
| 3| b| 1| 1|
| 3| c| 1| 2|
| 4| c| 0| 2|
| 5| c| 0| 2|
+---+-----+-------+-----+
注意:这是一个性能非常低效的解决方案,因为 Spark 必须将所有行放在同一个分区中。您是否有任何列可以对数据进行分区?
id | value | prevValue | indicator |
---|---|---|---|
1 | emp1 | null | 1 |
2 | emp2 | emp1 | 2 |
3 | emp1 | emp2 | 3 |
4 | emp1 | emp1 | 3 |
5 | emp3 | emp1 | 4 |
6 | emp3 | emp3 | 4 |
7 | emp1 | emp3 | 5 |
8 | emp2 | emp1 | 6 |
9 | emp2 | emp2 | 6 |
10 | emp2 | emp2 | 6 |
假设这整组行在一个会话中window
我使用滞后函数创建了 prevValue
列
lag("value", 1).over(sessionWindow).as("prevValue")
.
我想创建 table 中显示的指标列,但还没有成功。
当上一行的值与当前行的值不同时,值就会发生转变,每当发生转变时,指示器行就会递增 1,否则与前一行保持相同。
创建指标列的主要原因是稍后对指标进行分组。
这是我试图创建列的方法,但这不起作用。任何解决此问题的帮助将不胜感激。
.withColumn("indicator", when(col("prevValue").isNull, 1).otherwise(0))
.withColumn("indicator",
when(col("value") =!= col("prevValue"), lag("indicator", 1).over(sessionWindow) + 1)
.otherwise(lag("indicator", 1, 1).over(sessionWindow)))
val indWindow = Window.orderBy("id")
val sumWIndow = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("changed", when(col("value") =!= lag(col("value"), 1).over(indWindow), 1).otherwise(0))
.withColumn("group", sum("changed").over(sumWIndow))
产生:
+---+-----+-------+-----+
| id|value|changed|group|
+---+-----+-------+-----+
| 1| a| 0| 0|
| 2| a| 0| 0|
| 3| b| 1| 1|
| 3| c| 1| 2|
| 4| c| 0| 2|
| 5| c| 0| 2|
+---+-----+-------+-----+
注意:这是一个性能非常低效的解决方案,因为 Spark 必须将所有行放在同一个分区中。您是否有任何列可以对数据进行分区?