Apache Spark 在 Dataframe 中找到第一个不同的前一行
Apache Spark find first different preceding row in Dataframe
我有以下格式的 Apache Spark Dataframe
| ID | groupId | phaseName |
|----|-----------|-----------|
| 10 | someHash1 | PhaseA |
| 11 | someHash1 | PhaseB |
| 12 | someHash1 | PhaseB |
| 13 | someHash2 | PhaseX |
| 14 | someHash2 | PhaseY |
每一行代表一个阶段,该阶段发生在由多个阶段组成的过程中。 ID
列表示阶段的顺序,groupId
列显示哪些阶段属于一起。
我想向数据框添加一个新列:previousPhaseName。此列应指明 同一程序的前一个不同阶段 。进程的第一阶段(具有最小 ID 的阶段)将具有 null
作为前一阶段。当一个阶段出现两次或更多次时,第二次(第三次...)出现将具有相同的先前阶段名称例如:
df =
| ID | groupId | phaseName | prevPhaseName |
|----|-----------|-----------|---------------|
| 10 | someHash1 | PhaseA | null |
| 11 | someHash1 | PhaseB | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA |
| 13 | someHash2 | PhaseX | null |
| 14 | someHash2 | PhaseY | PhaseX |
我不确定如何实现它。我的第一个方法是:
- 创建第二个空数据框 df2
- 对于 df 中的每一行:
找到 groupId = row.groupId、ID < row.ID 和最大 id 的行
- 将此行添加到 df2
- 加入 df1 和 df2
使用 Window 函数的部分解决方案
我用Window Functions
聚合了前一阶段的名称、当前阶段在组中之前出现的次数(不一定在一行中)以及当前和前一阶段名称的信息相等:
WindowSpec windowSpecPrev = Window
.partitionBy(df.col("groupId"))
.orderBy(df.col("ID"));
WindowSpec windowSpecCount = Window
.partitionBy(df.col("groupId"), df.col("phaseName"))
.orderBy(df.col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df
.withColumn("prevPhase", functions.lag("phaseName", 1).over(windowSpecPrev))
.withColumn("phaseCount", functions.count("phaseId").over(windowSpecCount))
.withColumn("prevSame", when(col("prevPhase").equalTo(col("phaseName")),1).otherwise(0))
df =
| ID | groupId | phaseName | prevPhase | phaseCount | prevSame |
|----|-----------|-----------|-------------|------------|----------|
| 10 | someHash1 | PhaseA | null | 1 | 0 |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 0 |
| 12 | someHash1 | PhaseB | PhaseB | 2 | 1 |
| 13 | someHash2 | PhaseX | null | 1 | 0 |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 0 |
这仍然不是我想要实现的,但现在已经足够好了
进一步的想法
为了获得前一个不同阶段的名称,我看到了三种我尚未彻底调查的可能性:
- 实现自己的
lag
函数,该函数不采用偏移量,而是递归检查前一行,直到找到与给定行不同的值。 (虽然我认为不可能在 Spark SQL 中使用自己的解析 window 函数)
- 想办法根据
phaseCount
的值动态设置lag
函数的偏移量。 (如果之前出现的相同 phaseName 未出现在单个序列中,则可能会失败)
- 在存储第一个给定输入的 ID 和 phaseName 的 window 上使用
UserDefinedAggregateFunction
,并寻找具有不同 phaseName 的最高 ID。
我猜你可以使用 Spark window(行框架)函数。检查 api 文档和以下 post.
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
我能够通过以下方式解决这个问题:
- 获取(普通)上一阶段。
- 引入一个新的 id 来对按顺序发生的阶段进行分组。 (借助这个 answer)。这需要两个步骤。首先检查当前和之前的阶段名称是否相等,并相应地分配一个 groupCount 值。其次计算该值的累计和。
- 将顺序组第一行的前一阶段分配给其所有成员。
实施
WindowSpec specGroup = Window.partitionBy(col("groupId"))
.orderBy(col("ID"));
WindowSpec specSeqGroupId = Window.partitionBy(col("groupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
WindowSpec specPrevDiff = Window.partitionBy(col("groupId"), col("seqGroupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df.withColumn("prevPhase", coalesce(lag("phaseName", 1).over(specGroup), lit("NO_PREV")))
.withColumn("seqCount", when(col("prevPhase").equalTo(col("phaseName")).or(col("prevPhase").equalTo("NO_PREV")),0).otherwise(1))
.withColumn("seqGroupId", sum("seqCount").over(specSeqGroupId))
.withColumn("prevDiff", first("prevPhase").over(specPrevDiff));
结果
df =
| ID | groupId | phaseName | prevPhase | seqCount | seqGroupId | prevDiff |
|----|-----------|-----------|-----------|----------|------------|----------|
| 10 | someHash1 | PhaseA | NO_PREV | 0 | 0 | NO_PREV |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 1 | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA | 0 | 1 | PhaseA |
| 13 | someHash2 | PhaseX | NO_PREV | 0 | 0 | NO_PREV |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 1 | PhaseX |
如有任何建议,特别是在这些操作的效率方面,我们将不胜感激。
我有以下格式的 Apache Spark Dataframe
| ID | groupId | phaseName |
|----|-----------|-----------|
| 10 | someHash1 | PhaseA |
| 11 | someHash1 | PhaseB |
| 12 | someHash1 | PhaseB |
| 13 | someHash2 | PhaseX |
| 14 | someHash2 | PhaseY |
每一行代表一个阶段,该阶段发生在由多个阶段组成的过程中。 ID
列表示阶段的顺序,groupId
列显示哪些阶段属于一起。
我想向数据框添加一个新列:previousPhaseName。此列应指明 同一程序的前一个不同阶段 。进程的第一阶段(具有最小 ID 的阶段)将具有 null
作为前一阶段。当一个阶段出现两次或更多次时,第二次(第三次...)出现将具有相同的先前阶段名称例如:
df =
| ID | groupId | phaseName | prevPhaseName |
|----|-----------|-----------|---------------|
| 10 | someHash1 | PhaseA | null |
| 11 | someHash1 | PhaseB | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA |
| 13 | someHash2 | PhaseX | null |
| 14 | someHash2 | PhaseY | PhaseX |
我不确定如何实现它。我的第一个方法是:
- 创建第二个空数据框 df2
- 对于 df 中的每一行:
找到 groupId = row.groupId、ID < row.ID 和最大 id 的行
- 将此行添加到 df2
- 加入 df1 和 df2
使用 Window 函数的部分解决方案
我用Window Functions
聚合了前一阶段的名称、当前阶段在组中之前出现的次数(不一定在一行中)以及当前和前一阶段名称的信息相等:
WindowSpec windowSpecPrev = Window
.partitionBy(df.col("groupId"))
.orderBy(df.col("ID"));
WindowSpec windowSpecCount = Window
.partitionBy(df.col("groupId"), df.col("phaseName"))
.orderBy(df.col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df
.withColumn("prevPhase", functions.lag("phaseName", 1).over(windowSpecPrev))
.withColumn("phaseCount", functions.count("phaseId").over(windowSpecCount))
.withColumn("prevSame", when(col("prevPhase").equalTo(col("phaseName")),1).otherwise(0))
df =
| ID | groupId | phaseName | prevPhase | phaseCount | prevSame |
|----|-----------|-----------|-------------|------------|----------|
| 10 | someHash1 | PhaseA | null | 1 | 0 |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 0 |
| 12 | someHash1 | PhaseB | PhaseB | 2 | 1 |
| 13 | someHash2 | PhaseX | null | 1 | 0 |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 0 |
这仍然不是我想要实现的,但现在已经足够好了
进一步的想法
为了获得前一个不同阶段的名称,我看到了三种我尚未彻底调查的可能性:
- 实现自己的
lag
函数,该函数不采用偏移量,而是递归检查前一行,直到找到与给定行不同的值。 (虽然我认为不可能在 Spark SQL 中使用自己的解析 window 函数) - 想办法根据
phaseCount
的值动态设置lag
函数的偏移量。 (如果之前出现的相同 phaseName 未出现在单个序列中,则可能会失败) - 在存储第一个给定输入的 ID 和 phaseName 的 window 上使用
UserDefinedAggregateFunction
,并寻找具有不同 phaseName 的最高 ID。
我猜你可以使用 Spark window(行框架)函数。检查 api 文档和以下 post.
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
我能够通过以下方式解决这个问题:
- 获取(普通)上一阶段。
- 引入一个新的 id 来对按顺序发生的阶段进行分组。 (借助这个 answer)。这需要两个步骤。首先检查当前和之前的阶段名称是否相等,并相应地分配一个 groupCount 值。其次计算该值的累计和。
- 将顺序组第一行的前一阶段分配给其所有成员。
实施
WindowSpec specGroup = Window.partitionBy(col("groupId"))
.orderBy(col("ID"));
WindowSpec specSeqGroupId = Window.partitionBy(col("groupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
WindowSpec specPrevDiff = Window.partitionBy(col("groupId"), col("seqGroupId"))
.orderBy(col("ID"))
.rowsBetween(Long.MIN_VALUE, 0);
df.withColumn("prevPhase", coalesce(lag("phaseName", 1).over(specGroup), lit("NO_PREV")))
.withColumn("seqCount", when(col("prevPhase").equalTo(col("phaseName")).or(col("prevPhase").equalTo("NO_PREV")),0).otherwise(1))
.withColumn("seqGroupId", sum("seqCount").over(specSeqGroupId))
.withColumn("prevDiff", first("prevPhase").over(specPrevDiff));
结果
df =
| ID | groupId | phaseName | prevPhase | seqCount | seqGroupId | prevDiff |
|----|-----------|-----------|-----------|----------|------------|----------|
| 10 | someHash1 | PhaseA | NO_PREV | 0 | 0 | NO_PREV |
| 11 | someHash1 | PhaseB | PhaseA | 1 | 1 | PhaseA |
| 12 | someHash1 | PhaseB | PhaseA | 0 | 1 | PhaseA |
| 13 | someHash2 | PhaseX | NO_PREV | 0 | 0 | NO_PREV |
| 14 | someHash2 | PhaseY | PhaseX | 1 | 1 | PhaseX |
如有任何建议,特别是在这些操作的效率方面,我们将不胜感激。