比较spark中数据框中的行,以根据行的比较为列分配值
Compare rows in a dataframe in spark, to assign value to a column based on comparison of rows
我的数据如下所示
旅程Table
SERNR
TYPE
123
null
456
null
789
null
段Table
SERNR
Sgmnt
FROM-Station
TO-Station
123
01
A
B
123
02
B
C
123
03
C
B
123
04
B
A
456
01
A
B
456
02
B
C
456
03
C
D
456
04
D
A
789
04
A
B
我想加入这两个数据 frames/tables 并检查旅程站 FROM
和 TO
以确定旅程类型,即如果它的 return 旅程某种类型 A
如果它的镜像 return 某种类型 B
,如果它是单程旅行 某种类型 C
类型计算如下
假设行程 SERNR 123,行程详情为 A->B , B->C ,C->B,B->A,这是一个镜像旅程,因为它A-B-C然后C-B-A.
对于 789 它的 A->B 所以这是一个正常的旅程。
对于 456 它的 A-> B, B->C, C->D, D-A, 简而言之 A-B-C 然后 C-D-A ,这是一个 return 但不是镜像
我真的不知道如何根据SERNR
对Dataframe中的行进行比较,通过检查相同[=的FROM
和To
站来确定类型15=]
如果我能得到指导并继续实施相同的方法,我将不胜感激。
使用 from_ station 的 cllect_list 或 to_station 通过将其与 SERNR 分组并使用段排序
您可以将 FROM
TO
旅程的列表收集到每个 SERNR
的数组列中,然后加入数组元素以获得 journey_path
(A-B-C...
).
当你得到每个旅程的旅程路径时,你可以使用when
表达式来确定TYPE
:
- 如果 first FROM != last TO 那么它是
normal
- else : 如果 journey_path 的反转 == journey_path
mirror
否则它是 return
注意在分组收集FROM - TO
的列表时,需要用一个Window来保持段的顺序。
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = segment_df.select(
col("SERNR"),
array_join(
collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
"-"
).alias("journey_path")
).dropDuplicates(Seq("SERNR")).withColumn(
"TYPE",
when(
substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
"normal"
).otherwise(
when(
reverse(col("journey_path")) === col("journey_path"),
"mirror"
).otherwise("return")
)
)
.drop("journey_path")
result.show
//+-----+------+
//|SERNR| TYPE|
//+-----+------+
//| 789|normal|
//| 456|return|
//| 123|mirror|
//+-----+------+
我的数据如下所示
旅程Table
SERNR | TYPE |
---|---|
123 | null |
456 | null |
789 | null |
段Table
SERNR | Sgmnt | FROM-Station | TO-Station |
---|---|---|---|
123 | 01 | A | B |
123 | 02 | B | C |
123 | 03 | C | B |
123 | 04 | B | A |
456 | 01 | A | B |
456 | 02 | B | C |
456 | 03 | C | D |
456 | 04 | D | A |
789 | 04 | A | B |
我想加入这两个数据 frames/tables 并检查旅程站 FROM
和 TO
以确定旅程类型,即如果它的 return 旅程某种类型 A
如果它的镜像 return 某种类型 B
,如果它是单程旅行 某种类型 C
类型计算如下
假设行程 SERNR 123,行程详情为 A->B , B->C ,C->B,B->A,这是一个镜像旅程,因为它A-B-C然后C-B-A.
对于 789 它的 A->B 所以这是一个正常的旅程。
对于 456 它的 A-> B, B->C, C->D, D-A, 简而言之 A-B-C 然后 C-D-A ,这是一个 return 但不是镜像
我真的不知道如何根据SERNR
对Dataframe中的行进行比较,通过检查相同[=的FROM
和To
站来确定类型15=]
如果我能得到指导并继续实施相同的方法,我将不胜感激。
使用 from_ station 的 cllect_list 或 to_station 通过将其与 SERNR 分组并使用段排序
您可以将 FROM
TO
旅程的列表收集到每个 SERNR
的数组列中,然后加入数组元素以获得 journey_path
(A-B-C...
).
当你得到每个旅程的旅程路径时,你可以使用when
表达式来确定TYPE
:
- 如果 first FROM != last TO 那么它是
normal
- else : 如果 journey_path 的反转 == journey_path
mirror
否则它是return
注意在分组收集FROM - TO
的列表时,需要用一个Window来保持段的顺序。
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = segment_df.select(
col("SERNR"),
array_join(
collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
"-"
).alias("journey_path")
).dropDuplicates(Seq("SERNR")).withColumn(
"TYPE",
when(
substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
"normal"
).otherwise(
when(
reverse(col("journey_path")) === col("journey_path"),
"mirror"
).otherwise("return")
)
)
.drop("journey_path")
result.show
//+-----+------+
//|SERNR| TYPE|
//+-----+------+
//| 789|normal|
//| 456|return|
//| 123|mirror|
//+-----+------+