如何使用 Spark Windowing 从数据框中的当前行中查找下一个出现的项目?

How to find the next occurring item from current row in a data frame using Spark Windowing?

我有以下数据框:

+------+----------+-------------+--------------------+---------+-----+----------+
|ID    |MEM_ID    | BFS         | SVC_DT             |TYP      |SEQ  |BFS_SEQ   |
+------+----------+----------------------------------+---------+-----+----------+
|105771|29378668  | BRIMONIDINE | 2019-02-04 00:00:00|PD       |1    |1         |
|105772|29378668  | BRIMONIDINE | 2019-04-04 00:00:00|PD       |2    |2         |
|105773|29378668  | BRIMONIDINE | 2019-04-17 00:00:00|RV       |3    |3         |
|105774|29378668  | TIMOLOL     | 2019-04-17 00:00:00|RV       |4    |1         |
|105775|29378668  | BRIMONIDINE | 2019-04-22 00:00:00|PD       |5    |4         |
|105776|29378668  | TIMOLOL     | 2019-04-22 00:00:00|PD       |6    |2         |
+------+----------+----------------------------------+---------+-----+----------+

对于每一行,我必须在当前行的 BFS 级别找到下一个 'PD' Typ 的出现,并将其关联的 ID 填充为名为 'NEXT_PD_TYP_ID'

的新列

我期望的输出是:

+------+---------+-------------+--------------------+----+-----+---------+---------------+
|ID    |MEM_ID   | BFS         | SVC_DT             |TYP |SEQ  |BFS_SEQ  |NEXT_PD_TYP_ID |
+------+---------+----------------------------------+----+-----+---------+---------------+
|105771|29378668 | BRIMONIDINE | 2019-02-04 00:00:00|PD  |1    |1        |105772         |
|105772|29378668 | BRIMONIDINE | 2019-04-04 00:00:00|PD  |2    |2        |105775         | 
|105773|29378668 | BRIMONIDINE | 2019-04-17 00:00:00|RV  |3    |3        |105775         |
|105774|29378668 | TIMOLOL     | 2019-04-17 00:00:00|RV  |4    |1        |105776         |
|105775|29378668 | BRIMONIDINE | 2019-04-22 00:00:00|PD  |5    |4        |null           | 
|105776|29378668 | TIMOLOL     | 2019-04-22 00:00:00|PD  |6    |2        |null           |
+------+---------+----------------------------------+----+-----+---------+---------------+

需要帮助。

我尝试使用条件聚合:max(when),但是因为它有多个 'PD',所以 max 只为所有行返回一个值。

没有错误消息

希望对您有所帮助。 我创建了一个 ID 为 TYP === PD 的新列。我称之为 TYPPDID。 然后我使用 Window 从下一行到无界下一行的帧,得到第一个 not-null TYPPDID orderBy("ID")最后只是按顺序显示记录。

import org.apache.spark.sql.functions._

val df = Seq(
("105771", "BRIMONIDINE", "PD"),
("105772", "BRIMONIDINE", "PD"),
("105773", "BRIMONIDINE","RV"),
("105774", "TIMOLOL", "RV"),
("105775", "BRIMONIDINE", "PD"),
("105776", "TIMOLOL", "PD")
).toDF("ID", "BFS", "TYP").withColumn("TYPPDID", when($"TYP" === "PD", $"ID"))
df: org.apache.spark.sql.DataFrame = [ID: string, BFS: string ... 2 more fields]

scala> df.show
+------+-----------+---+-------+
|    ID|        BFS|TYP|TYPPDID|
+------+-----------+---+-------+
|105771|BRIMONIDINE| PD| 105771|
|105772|BRIMONIDINE| PD| 105772|
|105773|BRIMONIDINE| RV|   null|
|105774|    TIMOLOL| RV|   null|
|105775|BRIMONIDINE| PD| 105775|
|105776|    TIMOLOL| PD| 105776|
+------+-----------+---+-------+


scala> val overColumns = Window.partitionBy("BFS").orderBy("ID").rowsBetween(1, Window.unboundedFollowing)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@eb923ef


scala> df.withColumn("NEXT_PD_TYP_ID",first("TYPPDID", true).over(overColumns)).orderBy("ID").show(false)
+------+-----------+---+-------+-------+
|ID    |BFS        |TYP|TYPPDID|NEXT_PD_TYP_ID|
+------+-----------+---+-------+-------+
|105771|BRIMONIDINE|PD |105771 |105772 |
|105772|BRIMONIDINE|PD |105772 |105775 |
|105773|BRIMONIDINE|RV |null   |105775 |
|105774|TIMOLOL    |RV |null   |105776 |
|105775|BRIMONIDINE|PD |105775 |null   |
|105776|TIMOLOL    |PD |105776 |null   |
+------+-----------+---+-------+-------+