Scala Pass window 分区数据集到 UDF
Scala Pass window partition dataset to UDF
我有一个如下所示的数据框,
Id1
Id2
Id3
TaskId
TaskName
index
1
11
bc123-234
dfr3ws-45d
randomName1
1
1
11
bc123-234
er98d3-lkj
randomName2
2
1
11
bc123-234
hu77d9-mnb
randomName3
3
1
11
bc123-234
xc33d5-rew
deployhere4
4
1
11
xre43-876
dfr3ws-45d
randomName1
1
1
11
xre43-876
er98d3-lkj
deployhere2
2
1
11
xre43-876
hu77d9-mnb
randomName3
3
1
11
xre43-876
xc33d5-rew
randomName4
4
我使用 Id3 和 Id2 对数据进行了分区,并添加了 row_number。
我需要执行以下条件:
TaskId "hu77d9-mnb" 应该在包含 deploy 的任务名称之前。正如上面 table 所建议的那样,名称将是随机的,我需要读取分区中的每个名称并查看其中包含 deploy 的名称。
如果部署 taskName 索引大于 taskID 索引,那么我将该值标记为 1,否则为 0。
我需要像这样获得最终 table:
Id1
Id2
Id3
TaskId
TaskName
index
result
1
11
bc123-234
dfr3ws-45d
randomName1
1
1
1
11
bc123-234
er98d3-lkj
randomName2
2
1
1
11
bc123-234
hu77d9-mnb
randomName3
3
1
1
11
bc123-234
xc33d5-rew
deployhere4
4
1
1
11
xre43-876
dfr3ws-45d
randomName1
1
0
1
11
xre43-876
er98d3-lkj
deployhere2
2
0
1
11
xre43-876
hu77d9-mnb
randomName3
3
0
1
11
xre43-876
xc33d5-rew
randomName4
4
0
我被困在这个地方如何将分区数据传递给 UDF(或其他函数,如 UDAF)并执行此任务。任何建议都会有所帮助。谢谢你的时间。
“deploy”行的索引和特定行(“hu77d9-mnb”)的索引可以用Window“first”函数分配给每一行,然后进行比较:
val df = Seq(
(1, 11, "bc123-234", "dfr3ws-45d", "randomName1", 1),
(1, 11, "bc123-234", "er98d3-lkj", "randomName2", 2),
(1, 11, "bc123-234", "hu77d9-mnb", "randomName3", 3),
(1, 11, "bc123-234", "xc33d5-rew", "deployhere4", 4),
(1, 11, "xre43-876", "dfr3ws-45d", "randomName1", 1),
(1, 11, "xre43-876", "er98d3-lkj", "deployhere2", 2),
(1, 11, "xre43-876", "hu77d9-mnb", "randomName3", 3),
(1, 11, "xre43-876", "xc33d5-rew", "randomName4", 4)
).toDF("Id1", "Id2", "Id3", "TaskID", "TaskName", "index")
val specificTaskId = "hu77d9-mnb"
val idsWindow = Window.partitionBy("Id1", "Id2", "Id3")
df.withColumn("deployIndex",
first(
when(instr($"TaskName", "deploy") > 0, $"index").otherwise(null),
true)
.over(idsWindow))
.withColumn("specificTaskIdIndex",
first(
when($"TaskID" === lit(specificTaskId), $"index").otherwise(null),
true)
.over(idsWindow))
.withColumn("result",
when($"specificTaskIdIndex" > $"deployIndex", 0).otherwise(1)
)
输出(必须删除“deployIndex”和“specificTaskIdIndex”列):
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+
|Id1|Id2|Id3 |TaskID |TaskName |index|deployIndex|specificTaskIdIndex|result|
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+
|1 |11 |bc123-234|dfr3ws-45d|randomName1|1 |4 |3 |1 |
|1 |11 |bc123-234|er98d3-lkj|randomName2|2 |4 |3 |1 |
|1 |11 |bc123-234|hu77d9-mnb|randomName3|3 |4 |3 |1 |
|1 |11 |bc123-234|xc33d5-rew|deployhere4|4 |4 |3 |1 |
|1 |11 |xre43-876|dfr3ws-45d|randomName1|1 |2 |3 |0 |
|1 |11 |xre43-876|er98d3-lkj|deployhere2|2 |2 |3 |0 |
|1 |11 |xre43-876|hu77d9-mnb|randomName3|3 |2 |3 |0 |
|1 |11 |xre43-876|xc33d5-rew|randomName4|4 |2 |3 |0 |
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+
我有一个如下所示的数据框,
Id1 | Id2 | Id3 | TaskId | TaskName | index |
---|---|---|---|---|---|
1 | 11 | bc123-234 | dfr3ws-45d | randomName1 | 1 |
1 | 11 | bc123-234 | er98d3-lkj | randomName2 | 2 |
1 | 11 | bc123-234 | hu77d9-mnb | randomName3 | 3 |
1 | 11 | bc123-234 | xc33d5-rew | deployhere4 | 4 |
1 | 11 | xre43-876 | dfr3ws-45d | randomName1 | 1 |
1 | 11 | xre43-876 | er98d3-lkj | deployhere2 | 2 |
1 | 11 | xre43-876 | hu77d9-mnb | randomName3 | 3 |
1 | 11 | xre43-876 | xc33d5-rew | randomName4 | 4 |
我使用 Id3 和 Id2 对数据进行了分区,并添加了 row_number。
我需要执行以下条件:
TaskId "hu77d9-mnb" 应该在包含 deploy 的任务名称之前。正如上面 table 所建议的那样,名称将是随机的,我需要读取分区中的每个名称并查看其中包含 deploy 的名称。
如果部署 taskName 索引大于 taskID 索引,那么我将该值标记为 1,否则为 0。
我需要像这样获得最终 table:
Id1 | Id2 | Id3 | TaskId | TaskName | index | result |
---|---|---|---|---|---|---|
1 | 11 | bc123-234 | dfr3ws-45d | randomName1 | 1 | 1 |
1 | 11 | bc123-234 | er98d3-lkj | randomName2 | 2 | 1 |
1 | 11 | bc123-234 | hu77d9-mnb | randomName3 | 3 | 1 |
1 | 11 | bc123-234 | xc33d5-rew | deployhere4 | 4 | 1 |
1 | 11 | xre43-876 | dfr3ws-45d | randomName1 | 1 | 0 |
1 | 11 | xre43-876 | er98d3-lkj | deployhere2 | 2 | 0 |
1 | 11 | xre43-876 | hu77d9-mnb | randomName3 | 3 | 0 |
1 | 11 | xre43-876 | xc33d5-rew | randomName4 | 4 | 0 |
我被困在这个地方如何将分区数据传递给 UDF(或其他函数,如 UDAF)并执行此任务。任何建议都会有所帮助。谢谢你的时间。
“deploy”行的索引和特定行(“hu77d9-mnb”)的索引可以用Window“first”函数分配给每一行,然后进行比较:
val df = Seq(
(1, 11, "bc123-234", "dfr3ws-45d", "randomName1", 1),
(1, 11, "bc123-234", "er98d3-lkj", "randomName2", 2),
(1, 11, "bc123-234", "hu77d9-mnb", "randomName3", 3),
(1, 11, "bc123-234", "xc33d5-rew", "deployhere4", 4),
(1, 11, "xre43-876", "dfr3ws-45d", "randomName1", 1),
(1, 11, "xre43-876", "er98d3-lkj", "deployhere2", 2),
(1, 11, "xre43-876", "hu77d9-mnb", "randomName3", 3),
(1, 11, "xre43-876", "xc33d5-rew", "randomName4", 4)
).toDF("Id1", "Id2", "Id3", "TaskID", "TaskName", "index")
val specificTaskId = "hu77d9-mnb"
val idsWindow = Window.partitionBy("Id1", "Id2", "Id3")
df.withColumn("deployIndex",
first(
when(instr($"TaskName", "deploy") > 0, $"index").otherwise(null),
true)
.over(idsWindow))
.withColumn("specificTaskIdIndex",
first(
when($"TaskID" === lit(specificTaskId), $"index").otherwise(null),
true)
.over(idsWindow))
.withColumn("result",
when($"specificTaskIdIndex" > $"deployIndex", 0).otherwise(1)
)
输出(必须删除“deployIndex”和“specificTaskIdIndex”列):
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+
|Id1|Id2|Id3 |TaskID |TaskName |index|deployIndex|specificTaskIdIndex|result|
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+
|1 |11 |bc123-234|dfr3ws-45d|randomName1|1 |4 |3 |1 |
|1 |11 |bc123-234|er98d3-lkj|randomName2|2 |4 |3 |1 |
|1 |11 |bc123-234|hu77d9-mnb|randomName3|3 |4 |3 |1 |
|1 |11 |bc123-234|xc33d5-rew|deployhere4|4 |4 |3 |1 |
|1 |11 |xre43-876|dfr3ws-45d|randomName1|1 |2 |3 |0 |
|1 |11 |xre43-876|er98d3-lkj|deployhere2|2 |2 |3 |0 |
|1 |11 |xre43-876|hu77d9-mnb|randomName3|3 |2 |3 |0 |
|1 |11 |xre43-876|xc33d5-rew|randomName4|4 |2 |3 |0 |
+---+---+---------+----------+-----------+-----+-----------+-------------------+------+