使用 spark 在 Delta table 中进行棘手的 upsert

Tricky upsert in Delta table using spark

我有一个案例class如下:

case class UniqueException(
                                   ExceptionId:String,
                                   LastUpdateTime:Timestamp,
                                   IsDisplayed:Boolean,
                                   Message:String,
                                   ExceptionType:String,
                                   ExceptionMessage:String,
                                   FullException:String
                                 )

这用于生成增量 table。

需要满足以下条件:

  1. 如果 UniqueException 的 ExceptionId 是增量新的,则需要插入一个新的 UniqueException table。
  2. 如果传入 UniqueException 的 ExceptionId 已经存在于增量中 table 并且传入 UniqueException 的 LastUpdateTime 大于 14 天,则需要更新现有的 UniqueException。
  3. 如果 LastUpdateTime 小于 14 天,如果传入的 UniqueException 的 ExceptionId 已经存在于增量中,则不应更新传入的 UniqueException table。

我写了下面的代码,但是不满足上面的情况

val dfUniqueException = DeltaTable.forPath(outputFolder)
dfUniqueException.as("existing")
  .merge(dfNewExceptions.as("new"), "new.ExceptionId = existing.ExceptionId and new.LastUpdateTime >  date_add(existing.LastUpdateTime, 14")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

知道如何使用单个合并语句满足上述条件吗?

其实你的规则可以重写如下:

  • if存在异常
    • 如果 LastUpdateTime现有的和新的相差超过14天,更新现有的
    • else什么也不做
  • else 插入新异常

因此您可以更改您的代码,将“14 天规则”放在 whenMatched 子句中而不是 merge 子句中,如下所示:

import io.delta.tables.DeltaTable

val dfUniqueException = DeltaTable.forPath(outputFolder)
val dfNewExceptionLabeled = dfNewExceptions.as("new")

dfUniqueException.as("existing")
  .merge(dfNewExceptionLabeled, "new.ExceptionId = existing.ExceptionId")
  .whenMatched("new.LastUpdateTime > date_add(existing.LastUpdateTime, 14)")
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

如果您应用此代码但存在以下现有例外情况:

+---------------+-------------------+--------+
|ExceptionId    |LastUpdateTime     |Message |
+---------------+-------------------+--------+
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_02|2021-03-10 00:00:00|value_02|
|exception_id_03|2021-03-10 00:00:00|value_03|
+---------------+-------------------+--------+

以及以下新的例外情况:

+---------------+-------------------+--------+
|ExceptionId    |LastUpdateTime     |Message |
+---------------+-------------------+--------+
|exception_id_02|2021-03-20 00:00:00|value_04|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_04|2021-03-31 00:00:00|value_06|
+---------------+-------------------+--------+

你在 delta table 中的最终结果是:

+---------------+-------------------+--------+
|ExceptionId    |LastUpdateTime     |Message |
+---------------+-------------------+--------+
|exception_id_04|2021-03-31 00:00:00|value_06|
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_02|2021-03-10 00:00:00|value_02|
+---------------+-------------------+--------+