Spark Dataframe:根据特定列值对行进行分组和排名
Spark Dataframe: Group and rank rows on a certain column value
当 "ID" 列编号从 1 开始到最大值然后从 1 重置时,我正在尝试对列进行排名。
因此,前三行在"ID"上连续编号;因此,这些应该与 group rank = 1 分组。第四行和第五行在另一组中,组等级 = 2.
行按 "rownum" 列排序。我知道 row_number window 函数,但我认为我不能申请这个用例,因为没有常量 window。我只能考虑遍历数据框中的每一行,但不确定当数字重置为 1 时如何更新列。
val df = Seq(
(1, 1 ),
(2, 2 ),
(3, 3 ),
(4, 1),
(5, 2),
(6, 1),
(7, 1),
(8, 2)
).toDF("rownum", "ID")
df.show()
预期结果如下:
您可以使用 2 个 window 函数来完成,第一个标记状态,第二个计算 运行 总和:
df
.withColumn("increase", $"ID" > lag($"ID",1).over(Window.orderBy($"rownum")))
.withColumn("group_rank_of_ID",sum(when($"increase",lit(0)).otherwise(lit(1))).over(Window.orderBy($"rownum")))
.drop($"increase")
.show()
给出:
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
| 1| 1| 1|
| 2| 2| 1|
| 3| 3| 1|
| 4| 1| 2|
| 5| 2| 2|
| 6| 1| 3|
| 7| 1| 4|
| 8| 2| 4|
+------+---+----------------+
正如@Prithvi 指出的,我们可以在这里使用 lead
。
棘手的部分是为了使用window功能,例如lead
,我们至少需要提供顺序。
考虑
val nextID = lag('ID, 1, -1) over Window.orderBy('rownum)
val isNewGroup = 'ID <= nextID cast "integer"
val group_rank_of_ID = sum(isNewGroup) over Window.orderBy('rownum)
/* you can try
df.withColumn("intermediate", nextID).show
// ^^^^^^^-- can be `isNewGroup`, or other vals
*/
df.withColumn("group_rank_of_ID", group_rank_of_ID).show
/* returns
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
| 1| 1| 0|
| 2| 2| 0|
| 3| 3| 0|
| 4| 1| 1|
| 5| 2| 1|
| 6| 1| 2|
| 7| 1| 3|
| 8| 2| 3|
+------+---+----------------+
*/
df.withColumn("group_rank_of_ID", group_rank_of_ID + 1).show
/* returns
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
| 1| 1| 1|
| 2| 2| 1|
| 3| 3| 1|
| 4| 1| 2|
| 5| 2| 2|
| 6| 1| 3|
| 7| 1| 4|
| 8| 2| 4|
+------+---+----------------+
*/
当 "ID" 列编号从 1 开始到最大值然后从 1 重置时,我正在尝试对列进行排名。
因此,前三行在"ID"上连续编号;因此,这些应该与 group rank = 1 分组。第四行和第五行在另一组中,组等级 = 2.
行按 "rownum" 列排序。我知道 row_number window 函数,但我认为我不能申请这个用例,因为没有常量 window。我只能考虑遍历数据框中的每一行,但不确定当数字重置为 1 时如何更新列。
val df = Seq( (1, 1 ), (2, 2 ), (3, 3 ), (4, 1), (5, 2), (6, 1), (7, 1), (8, 2) ).toDF("rownum", "ID") df.show()
预期结果如下:
您可以使用 2 个 window 函数来完成,第一个标记状态,第二个计算 运行 总和:
df
.withColumn("increase", $"ID" > lag($"ID",1).over(Window.orderBy($"rownum")))
.withColumn("group_rank_of_ID",sum(when($"increase",lit(0)).otherwise(lit(1))).over(Window.orderBy($"rownum")))
.drop($"increase")
.show()
给出:
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
| 1| 1| 1|
| 2| 2| 1|
| 3| 3| 1|
| 4| 1| 2|
| 5| 2| 2|
| 6| 1| 3|
| 7| 1| 4|
| 8| 2| 4|
+------+---+----------------+
正如@Prithvi 指出的,我们可以在这里使用 lead
。
棘手的部分是为了使用window功能,例如lead
,我们至少需要提供顺序。
考虑
val nextID = lag('ID, 1, -1) over Window.orderBy('rownum)
val isNewGroup = 'ID <= nextID cast "integer"
val group_rank_of_ID = sum(isNewGroup) over Window.orderBy('rownum)
/* you can try
df.withColumn("intermediate", nextID).show
// ^^^^^^^-- can be `isNewGroup`, or other vals
*/
df.withColumn("group_rank_of_ID", group_rank_of_ID).show
/* returns
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
| 1| 1| 0|
| 2| 2| 0|
| 3| 3| 0|
| 4| 1| 1|
| 5| 2| 1|
| 6| 1| 2|
| 7| 1| 3|
| 8| 2| 3|
+------+---+----------------+
*/
df.withColumn("group_rank_of_ID", group_rank_of_ID + 1).show
/* returns
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
| 1| 1| 1|
| 2| 2| 1|
| 3| 3| 1|
| 4| 1| 2|
| 5| 2| 2|
| 6| 1| 3|
| 7| 1| 4|
| 8| 2| 4|
+------+---+----------------+
*/