基于最近日期的 Spark Join

Spark Join based on nearest Date

我有 2 张镶木地板桌子。简化架构如下:

case class Product(SerialNumber:Integer,
                   UniqueKey:String,
                   ValidityDate1:Date
                   )
                   
case class ExceptionEvents(SerialNumber:Integer,
                      ExceptionId:String,
                      ValidityDate2:Date
                     )
                

产品数据框可以包含以下条目,例如:

产品:

-----------------------------------------
SerialNumber    UniqueKey   ValidityDate1
-----------------------------------------
10001           Key_1       01/10/2021
10001           Key_2       05/10/2021
10001           Key_3       10/10/2021
10002           Key_4       02/10/2021
10003           Key_5       07/10/2021
-----------------------------------------

异常事件:

-----------------------------------------
SerialNumber    ExceptionId     ValidityDate2
-----------------------------------------
10001           ExcId_1         02/10/2021
10001           ExcId_2         05/10/2021
10001           ExcId_3         07/10/2021
10001           ExcId_4         11/10/2021
10001           ExcId_5         15/10/2021
-----------------------------------------

我想加入 2 个 DF,以便 SerialNumbers 匹配并映射 ValidityDate,使得 ExceptionEvent 的 ValidityDate2 大于 Product 的 ValidityDate1,但 2 个日期应尽可能接近。 例如,生成的 DF 应如下所示:

---------------------------------------------------------------------
SerialNumber    ExceptionId     UniqueKey       ValidityDate2
---------------------------------------------------------------------
10001           ExcId_1         Key_1           02/10/2021
10001           ExcId_2         Key_2           05/10/2021
10001           ExcId_3         Key_2           07/10/2021
10001           ExcId_4         Key_3           11/10/2021
10001           ExcId_5         Key_3           15/10/2021
---------------------------------------------------------------------

知道应该如何使用 scala 和 spark Dataframe API 完成查询吗?

以下解决方案适合我:

val dfp1 = List(("1001", "Key1", "01/10/2021"), ("1001", "Key2", "05/10/2021"), ("1001", "Key3", "10/10/2021"),  ("1002", "Key4", "02/10/2021")).toDF("SerialNumber", "UniqueKey", "Date1")

val dfProduct = dfp1.withColumn("Date1", to_date($"Date1","dd/MM/yyyy"))

val dfe1 = List(("1001", "ExcId1", "02/10/2021"), ("1001", "ExcId2", "05/10/2021"), ("1001", "ExcId3", "07/10/2021"),  ("1001", "ExcId4", "11/10/2021"),  ("1001", "ExcId5", "15/10/2021")).toDF("SerialNumber", "ExceptionId", "Date2")

val dfExceptions = dfe1.withColumn("Date2", to_date($"Date2","dd/MM/yyyy"))

val exceptionStat2 = dfExceptions.as("fact").join(dfProduct.as("dim"), Seq("SerialNumber")).select($"fact.*", $"dim.UniqueKey", datediff($"fact.Date2", $"dim.Date1").as("DiffDate")).where($"DiffDate" >= 0)

import org.apache.spark.sql.expressions.Window

val exceptionStat3 = exceptionStat2.withColumn("rank",  rank.over(Window.partitionBy($"SerialNumber", $"ExceptionId").orderBy($"DiffDate")) )
                        .where($"rank" === 1)
                        .select( $"SerialNumber", $"ExceptionId", $"UniqueKey", $"Date2", $"DiffDate", $"rank" )
                        .orderBy($"SerialNumber", $"Date2")