基于最近日期的 Spark Join
Spark Join based on nearest Date
我有 2 张镶木地板桌子。简化架构如下:
case class Product(SerialNumber:Integer,
UniqueKey:String,
ValidityDate1:Date
)
case class ExceptionEvents(SerialNumber:Integer,
ExceptionId:String,
ValidityDate2:Date
)
产品数据框可以包含以下条目,例如:
产品:
-----------------------------------------
SerialNumber UniqueKey ValidityDate1
-----------------------------------------
10001 Key_1 01/10/2021
10001 Key_2 05/10/2021
10001 Key_3 10/10/2021
10002 Key_4 02/10/2021
10003 Key_5 07/10/2021
-----------------------------------------
异常事件:
-----------------------------------------
SerialNumber ExceptionId ValidityDate2
-----------------------------------------
10001 ExcId_1 02/10/2021
10001 ExcId_2 05/10/2021
10001 ExcId_3 07/10/2021
10001 ExcId_4 11/10/2021
10001 ExcId_5 15/10/2021
-----------------------------------------
我想加入 2 个 DF,以便 SerialNumbers 匹配并映射 ValidityDate,使得 ExceptionEvent 的 ValidityDate2 大于 Product 的 ValidityDate1,但 2 个日期应尽可能接近。
例如,生成的 DF 应如下所示:
---------------------------------------------------------------------
SerialNumber ExceptionId UniqueKey ValidityDate2
---------------------------------------------------------------------
10001 ExcId_1 Key_1 02/10/2021
10001 ExcId_2 Key_2 05/10/2021
10001 ExcId_3 Key_2 07/10/2021
10001 ExcId_4 Key_3 11/10/2021
10001 ExcId_5 Key_3 15/10/2021
---------------------------------------------------------------------
知道应该如何使用 scala 和 spark Dataframe API 完成查询吗?
以下解决方案适合我:
val dfp1 = List(("1001", "Key1", "01/10/2021"), ("1001", "Key2", "05/10/2021"), ("1001", "Key3", "10/10/2021"), ("1002", "Key4", "02/10/2021")).toDF("SerialNumber", "UniqueKey", "Date1")
val dfProduct = dfp1.withColumn("Date1", to_date($"Date1","dd/MM/yyyy"))
val dfe1 = List(("1001", "ExcId1", "02/10/2021"), ("1001", "ExcId2", "05/10/2021"), ("1001", "ExcId3", "07/10/2021"), ("1001", "ExcId4", "11/10/2021"), ("1001", "ExcId5", "15/10/2021")).toDF("SerialNumber", "ExceptionId", "Date2")
val dfExceptions = dfe1.withColumn("Date2", to_date($"Date2","dd/MM/yyyy"))
val exceptionStat2 = dfExceptions.as("fact").join(dfProduct.as("dim"), Seq("SerialNumber")).select($"fact.*", $"dim.UniqueKey", datediff($"fact.Date2", $"dim.Date1").as("DiffDate")).where($"DiffDate" >= 0)
import org.apache.spark.sql.expressions.Window
val exceptionStat3 = exceptionStat2.withColumn("rank", rank.over(Window.partitionBy($"SerialNumber", $"ExceptionId").orderBy($"DiffDate")) )
.where($"rank" === 1)
.select( $"SerialNumber", $"ExceptionId", $"UniqueKey", $"Date2", $"DiffDate", $"rank" )
.orderBy($"SerialNumber", $"Date2")
我有 2 张镶木地板桌子。简化架构如下:
case class Product(SerialNumber:Integer,
UniqueKey:String,
ValidityDate1:Date
)
case class ExceptionEvents(SerialNumber:Integer,
ExceptionId:String,
ValidityDate2:Date
)
产品数据框可以包含以下条目,例如:
产品:
-----------------------------------------
SerialNumber UniqueKey ValidityDate1
-----------------------------------------
10001 Key_1 01/10/2021
10001 Key_2 05/10/2021
10001 Key_3 10/10/2021
10002 Key_4 02/10/2021
10003 Key_5 07/10/2021
-----------------------------------------
异常事件:
-----------------------------------------
SerialNumber ExceptionId ValidityDate2
-----------------------------------------
10001 ExcId_1 02/10/2021
10001 ExcId_2 05/10/2021
10001 ExcId_3 07/10/2021
10001 ExcId_4 11/10/2021
10001 ExcId_5 15/10/2021
-----------------------------------------
我想加入 2 个 DF,以便 SerialNumbers 匹配并映射 ValidityDate,使得 ExceptionEvent 的 ValidityDate2 大于 Product 的 ValidityDate1,但 2 个日期应尽可能接近。 例如,生成的 DF 应如下所示:
---------------------------------------------------------------------
SerialNumber ExceptionId UniqueKey ValidityDate2
---------------------------------------------------------------------
10001 ExcId_1 Key_1 02/10/2021
10001 ExcId_2 Key_2 05/10/2021
10001 ExcId_3 Key_2 07/10/2021
10001 ExcId_4 Key_3 11/10/2021
10001 ExcId_5 Key_3 15/10/2021
---------------------------------------------------------------------
知道应该如何使用 scala 和 spark Dataframe API 完成查询吗?
以下解决方案适合我:
val dfp1 = List(("1001", "Key1", "01/10/2021"), ("1001", "Key2", "05/10/2021"), ("1001", "Key3", "10/10/2021"), ("1002", "Key4", "02/10/2021")).toDF("SerialNumber", "UniqueKey", "Date1")
val dfProduct = dfp1.withColumn("Date1", to_date($"Date1","dd/MM/yyyy"))
val dfe1 = List(("1001", "ExcId1", "02/10/2021"), ("1001", "ExcId2", "05/10/2021"), ("1001", "ExcId3", "07/10/2021"), ("1001", "ExcId4", "11/10/2021"), ("1001", "ExcId5", "15/10/2021")).toDF("SerialNumber", "ExceptionId", "Date2")
val dfExceptions = dfe1.withColumn("Date2", to_date($"Date2","dd/MM/yyyy"))
val exceptionStat2 = dfExceptions.as("fact").join(dfProduct.as("dim"), Seq("SerialNumber")).select($"fact.*", $"dim.UniqueKey", datediff($"fact.Date2", $"dim.Date1").as("DiffDate")).where($"DiffDate" >= 0)
import org.apache.spark.sql.expressions.Window
val exceptionStat3 = exceptionStat2.withColumn("rank", rank.over(Window.partitionBy($"SerialNumber", $"ExceptionId").orderBy($"DiffDate")) )
.where($"rank" === 1)
.select( $"SerialNumber", $"ExceptionId", $"UniqueKey", $"Date2", $"DiffDate", $"rank" )
.orderBy($"SerialNumber", $"Date2")