如何使用 Spark Windowing 从数据框中的当前行中查找下一个出现的项目?
How to find the next occurring item from current row in a data frame using Spark Windowing?
我有以下数据框:
+------+----------+-------------+--------------------+---------+-----+----------+
|ID |MEM_ID | BFS | SVC_DT |TYP |SEQ |BFS_SEQ |
+------+----------+----------------------------------+---------+-----+----------+
|105771|29378668 | BRIMONIDINE | 2019-02-04 00:00:00|PD |1 |1 |
|105772|29378668 | BRIMONIDINE | 2019-04-04 00:00:00|PD |2 |2 |
|105773|29378668 | BRIMONIDINE | 2019-04-17 00:00:00|RV |3 |3 |
|105774|29378668 | TIMOLOL | 2019-04-17 00:00:00|RV |4 |1 |
|105775|29378668 | BRIMONIDINE | 2019-04-22 00:00:00|PD |5 |4 |
|105776|29378668 | TIMOLOL | 2019-04-22 00:00:00|PD |6 |2 |
+------+----------+----------------------------------+---------+-----+----------+
对于每一行,我必须在当前行的 BFS 级别找到下一个 'PD' Typ 的出现,并将其关联的 ID 填充为名为 'NEXT_PD_TYP_ID'
的新列
我期望的输出是:
+------+---------+-------------+--------------------+----+-----+---------+---------------+
|ID |MEM_ID | BFS | SVC_DT |TYP |SEQ |BFS_SEQ |NEXT_PD_TYP_ID |
+------+---------+----------------------------------+----+-----+---------+---------------+
|105771|29378668 | BRIMONIDINE | 2019-02-04 00:00:00|PD |1 |1 |105772 |
|105772|29378668 | BRIMONIDINE | 2019-04-04 00:00:00|PD |2 |2 |105775 |
|105773|29378668 | BRIMONIDINE | 2019-04-17 00:00:00|RV |3 |3 |105775 |
|105774|29378668 | TIMOLOL | 2019-04-17 00:00:00|RV |4 |1 |105776 |
|105775|29378668 | BRIMONIDINE | 2019-04-22 00:00:00|PD |5 |4 |null |
|105776|29378668 | TIMOLOL | 2019-04-22 00:00:00|PD |6 |2 |null |
+------+---------+----------------------------------+----+-----+---------+---------------+
需要帮助。
我尝试使用条件聚合:max(when),但是因为它有多个 'PD',所以 max 只为所有行返回一个值。
没有错误消息
希望对您有所帮助。
我创建了一个 ID 为 TYP === PD 的新列。我称之为 TYPPDID。
然后我使用 Window 从下一行到无界下一行的帧,得到第一个 not-null TYPPDID
orderBy("ID")
最后只是按顺序显示记录。
import org.apache.spark.sql.functions._
val df = Seq(
("105771", "BRIMONIDINE", "PD"),
("105772", "BRIMONIDINE", "PD"),
("105773", "BRIMONIDINE","RV"),
("105774", "TIMOLOL", "RV"),
("105775", "BRIMONIDINE", "PD"),
("105776", "TIMOLOL", "PD")
).toDF("ID", "BFS", "TYP").withColumn("TYPPDID", when($"TYP" === "PD", $"ID"))
df: org.apache.spark.sql.DataFrame = [ID: string, BFS: string ... 2 more fields]
scala> df.show
+------+-----------+---+-------+
| ID| BFS|TYP|TYPPDID|
+------+-----------+---+-------+
|105771|BRIMONIDINE| PD| 105771|
|105772|BRIMONIDINE| PD| 105772|
|105773|BRIMONIDINE| RV| null|
|105774| TIMOLOL| RV| null|
|105775|BRIMONIDINE| PD| 105775|
|105776| TIMOLOL| PD| 105776|
+------+-----------+---+-------+
scala> val overColumns = Window.partitionBy("BFS").orderBy("ID").rowsBetween(1, Window.unboundedFollowing)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@eb923ef
scala> df.withColumn("NEXT_PD_TYP_ID",first("TYPPDID", true).over(overColumns)).orderBy("ID").show(false)
+------+-----------+---+-------+-------+
|ID |BFS |TYP|TYPPDID|NEXT_PD_TYP_ID|
+------+-----------+---+-------+-------+
|105771|BRIMONIDINE|PD |105771 |105772 |
|105772|BRIMONIDINE|PD |105772 |105775 |
|105773|BRIMONIDINE|RV |null |105775 |
|105774|TIMOLOL |RV |null |105776 |
|105775|BRIMONIDINE|PD |105775 |null |
|105776|TIMOLOL |PD |105776 |null |
+------+-----------+---+-------+-------+
我有以下数据框:
+------+----------+-------------+--------------------+---------+-----+----------+
|ID |MEM_ID | BFS | SVC_DT |TYP |SEQ |BFS_SEQ |
+------+----------+----------------------------------+---------+-----+----------+
|105771|29378668 | BRIMONIDINE | 2019-02-04 00:00:00|PD |1 |1 |
|105772|29378668 | BRIMONIDINE | 2019-04-04 00:00:00|PD |2 |2 |
|105773|29378668 | BRIMONIDINE | 2019-04-17 00:00:00|RV |3 |3 |
|105774|29378668 | TIMOLOL | 2019-04-17 00:00:00|RV |4 |1 |
|105775|29378668 | BRIMONIDINE | 2019-04-22 00:00:00|PD |5 |4 |
|105776|29378668 | TIMOLOL | 2019-04-22 00:00:00|PD |6 |2 |
+------+----------+----------------------------------+---------+-----+----------+
对于每一行,我必须在当前行的 BFS 级别找到下一个 'PD' Typ 的出现,并将其关联的 ID 填充为名为 'NEXT_PD_TYP_ID'
的新列我期望的输出是:
+------+---------+-------------+--------------------+----+-----+---------+---------------+
|ID |MEM_ID | BFS | SVC_DT |TYP |SEQ |BFS_SEQ |NEXT_PD_TYP_ID |
+------+---------+----------------------------------+----+-----+---------+---------------+
|105771|29378668 | BRIMONIDINE | 2019-02-04 00:00:00|PD |1 |1 |105772 |
|105772|29378668 | BRIMONIDINE | 2019-04-04 00:00:00|PD |2 |2 |105775 |
|105773|29378668 | BRIMONIDINE | 2019-04-17 00:00:00|RV |3 |3 |105775 |
|105774|29378668 | TIMOLOL | 2019-04-17 00:00:00|RV |4 |1 |105776 |
|105775|29378668 | BRIMONIDINE | 2019-04-22 00:00:00|PD |5 |4 |null |
|105776|29378668 | TIMOLOL | 2019-04-22 00:00:00|PD |6 |2 |null |
+------+---------+----------------------------------+----+-----+---------+---------------+
需要帮助。
我尝试使用条件聚合:max(when),但是因为它有多个 'PD',所以 max 只为所有行返回一个值。
没有错误消息
希望对您有所帮助。
我创建了一个 ID 为 TYP === PD 的新列。我称之为 TYPPDID。
然后我使用 Window 从下一行到无界下一行的帧,得到第一个 not-null TYPPDID
orderBy("ID")
最后只是按顺序显示记录。
import org.apache.spark.sql.functions._
val df = Seq(
("105771", "BRIMONIDINE", "PD"),
("105772", "BRIMONIDINE", "PD"),
("105773", "BRIMONIDINE","RV"),
("105774", "TIMOLOL", "RV"),
("105775", "BRIMONIDINE", "PD"),
("105776", "TIMOLOL", "PD")
).toDF("ID", "BFS", "TYP").withColumn("TYPPDID", when($"TYP" === "PD", $"ID"))
df: org.apache.spark.sql.DataFrame = [ID: string, BFS: string ... 2 more fields]
scala> df.show
+------+-----------+---+-------+
| ID| BFS|TYP|TYPPDID|
+------+-----------+---+-------+
|105771|BRIMONIDINE| PD| 105771|
|105772|BRIMONIDINE| PD| 105772|
|105773|BRIMONIDINE| RV| null|
|105774| TIMOLOL| RV| null|
|105775|BRIMONIDINE| PD| 105775|
|105776| TIMOLOL| PD| 105776|
+------+-----------+---+-------+
scala> val overColumns = Window.partitionBy("BFS").orderBy("ID").rowsBetween(1, Window.unboundedFollowing)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@eb923ef
scala> df.withColumn("NEXT_PD_TYP_ID",first("TYPPDID", true).over(overColumns)).orderBy("ID").show(false)
+------+-----------+---+-------+-------+
|ID |BFS |TYP|TYPPDID|NEXT_PD_TYP_ID|
+------+-----------+---+-------+-------+
|105771|BRIMONIDINE|PD |105771 |105772 |
|105772|BRIMONIDINE|PD |105772 |105775 |
|105773|BRIMONIDINE|RV |null |105775 |
|105774|TIMOLOL |RV |null |105776 |
|105775|BRIMONIDINE|PD |105775 |null |
|105776|TIMOLOL |PD |105776 |null |
+------+-----------+---+-------+-------+