比较spark中数据框中的行,以根据行的比较为列分配值

Compare rows in a dataframe in spark, to assign value to a column based on comparison of rows

我的数据如下所示

旅程Table

SERNR TYPE
123 null
456 null
789 null

段Table

SERNR Sgmnt FROM-Station TO-Station
123 01 A B
123 02 B C
123 03 C B
123 04 B A
456 01 A B
456 02 B C
456 03 C D
456 04 D A
789 04 A B

我想加入这两个数据 frames/tables 并检查旅程站 FROMTO 以确定旅程类型,即如果它的 return 旅程某种类型 A 如果它的镜像 return 某种类型 B,如果它是单程旅行 某种类型 C

类型计算如下

假设行程 SERNR 123,行程详情为 A->B , B->C ,C->B,B->A,这是一个镜像旅程,因为它A-B-C然后C-B-A.

对于 789 它的 A->B 所以这是一个正常的旅程。

对于 456 它的 A-> B, B->C, C->D, D-A, 简而言之 A-B-C 然后 C-D-A ,这是一个 return 但不是镜像

我真的不知道如何根据SERNR对Dataframe中的行进行比较,通过检查相同[=的FROMTo站来确定类型15=]

如果我能得到指导并继续实施相同的方法,我将不胜感激。

使用 from_ station 的 cllect_list 或 to_station 通过将其与 SERNR 分组并使用段排序

您可以将 FROM TO 旅程的列表收集到每个 SERNR 的数组列中,然后加入数组元素以获得 journey_path (A-B-C...).

当你得到每个旅程的旅程路径时,你可以使用when表达式来确定TYPE

  • 如果 first FROM != last TO 那么它是 normal
  • else : 如果 journey_path 的反转 == journey_path mirror 否则它是 return

注意在分组收集FROM - TO的列表时,需要用一个Window来保持段的顺序。

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

val result = segment_df.select(
    col("SERNR"),
    array_join(
      collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
      "-"
    ).alias("journey_path")
  ).dropDuplicates(Seq("SERNR")).withColumn(
    "TYPE",
    when(
      substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
      "normal"
    ).otherwise(
      when(
        reverse(col("journey_path")) === col("journey_path"),
        "mirror"
      ).otherwise("return")
    )
  )
  .drop("journey_path")

result.show
//+-----+------+
//|SERNR|  TYPE|
//+-----+------+
//|  789|normal|
//|  456|return|
//|  123|mirror|
//+-----+------+