Scala Pass window 分区数据集到 UDF

Scala Pass window partition dataset to UDF

我有一个如下所示的数据框,

Id1 Id2 Id3 TaskId TaskName index
1 11 bc123-234 dfr3ws-45d randomName1 1
1 11 bc123-234 er98d3-lkj randomName2 2
1 11 bc123-234 hu77d9-mnb randomName3 3
1 11 bc123-234 xc33d5-rew deployhere4 4
1 11 xre43-876 dfr3ws-45d randomName1 1
1 11 xre43-876 er98d3-lkj deployhere2 2
1 11 xre43-876 hu77d9-mnb randomName3 3
1 11 xre43-876 xc33d5-rew randomName4 4

我使用 Id3 和 Id2 对数据进行了分区,并添加了 row_number。

我需要执行以下条件:

TaskId "hu77d9-mnb" 应该在包含 deploy 的任务名称之前。正如上面 table 所建议的那样,名称将是随机的,我需要读取分区中的每个名称并查看其中包含 deploy 的名称。

如果部署 taskName 索引大于 taskID 索引,那么我将该值标记为 1,否则为 0。

我需要像这样获得最终 table:

Id1 Id2 Id3 TaskId TaskName index result
1 11 bc123-234 dfr3ws-45d randomName1 1 1
1 11 bc123-234 er98d3-lkj randomName2 2 1
1 11 bc123-234 hu77d9-mnb randomName3 3 1
1 11 bc123-234 xc33d5-rew deployhere4 4 1
1 11 xre43-876 dfr3ws-45d randomName1 1 0
1 11 xre43-876 er98d3-lkj deployhere2 2 0
1 11 xre43-876 hu77d9-mnb randomName3 3 0
1 11 xre43-876 xc33d5-rew randomName4 4 0

我被困在这个地方如何将分区数据传递给 UDF(或其他函数,如 UDAF)并执行此任务。任何建议都会有所帮助。谢谢你的时间。

“deploy”行的索引和特定行(“hu77d9-mnb”)的索引可以用Window“first”函数分配给每一行,然后进行比较:

val df = Seq(
  (1, 11, "bc123-234", "dfr3ws-45d", "randomName1", 1),
  (1, 11, "bc123-234", "er98d3-lkj", "randomName2", 2),
  (1, 11, "bc123-234", "hu77d9-mnb", "randomName3", 3),
  (1, 11, "bc123-234", "xc33d5-rew", "deployhere4", 4),
  (1, 11, "xre43-876", "dfr3ws-45d", "randomName1", 1),
  (1, 11, "xre43-876", "er98d3-lkj", "deployhere2", 2),
  (1, 11, "xre43-876", "hu77d9-mnb", "randomName3", 3),
  (1, 11, "xre43-876", "xc33d5-rew", "randomName4", 4)
).toDF("Id1", "Id2", "Id3", "TaskID", "TaskName", "index")

val specificTaskId = "hu77d9-mnb"
val idsWindow = Window.partitionBy("Id1", "Id2", "Id3")

df.withColumn("deployIndex",
  first(
    when(instr($"TaskName", "deploy") > 0, $"index").otherwise(null),
    true)
    .over(idsWindow))

  .withColumn("specificTaskIdIndex",
    first(
      when($"TaskID" === lit(specificTaskId), $"index").otherwise(null),
      true)
      .over(idsWindow))

  .withColumn("result",
    when($"specificTaskIdIndex" > $"deployIndex", 0).otherwise(1)
  )

输出(必须删除“deployIndex”和“specificTaskIdIndex”列):

+---+---+---------+----------+-----------+-----+-----------+-------------------+------+
|Id1|Id2|Id3      |TaskID    |TaskName   |index|deployIndex|specificTaskIdIndex|result|
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+
|1  |11 |bc123-234|dfr3ws-45d|randomName1|1    |4          |3                  |1     |
|1  |11 |bc123-234|er98d3-lkj|randomName2|2    |4          |3                  |1     |
|1  |11 |bc123-234|hu77d9-mnb|randomName3|3    |4          |3                  |1     |
|1  |11 |bc123-234|xc33d5-rew|deployhere4|4    |4          |3                  |1     |
|1  |11 |xre43-876|dfr3ws-45d|randomName1|1    |2          |3                  |0     |
|1  |11 |xre43-876|er98d3-lkj|deployhere2|2    |2          |3                  |0     |
|1  |11 |xre43-876|hu77d9-mnb|randomName3|3    |2          |3                  |0     |
|1  |11 |xre43-876|xc33d5-rew|randomName4|4    |2          |3                  |0     |
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+