使用 spark 在 Delta table 中进行棘手的 upsert
Tricky upsert in Delta table using spark
我有一个案例class如下:
case class UniqueException(
ExceptionId:String,
LastUpdateTime:Timestamp,
IsDisplayed:Boolean,
Message:String,
ExceptionType:String,
ExceptionMessage:String,
FullException:String
)
这用于生成增量 table。
需要满足以下条件:
- 如果 UniqueException 的 ExceptionId 是增量新的,则需要插入一个新的 UniqueException table。
- 如果传入 UniqueException 的 ExceptionId 已经存在于增量中 table 并且传入 UniqueException 的 LastUpdateTime 大于 14 天,则需要更新现有的 UniqueException。
- 如果 LastUpdateTime 小于 14 天,如果传入的 UniqueException 的 ExceptionId 已经存在于增量中,则不应更新传入的 UniqueException table。
我写了下面的代码,但是不满足上面的情况
val dfUniqueException = DeltaTable.forPath(outputFolder)
dfUniqueException.as("existing")
.merge(dfNewExceptions.as("new"), "new.ExceptionId = existing.ExceptionId and new.LastUpdateTime > date_add(existing.LastUpdateTime, 14")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
知道如何使用单个合并语句满足上述条件吗?
其实你的规则可以重写如下:
- if存在异常
- 如果
LastUpdateTime
现有的和新的相差超过14天,更新现有的
- else什么也不做
- else 插入新异常
因此您可以更改您的代码,将“14 天规则”放在 whenMatched
子句中而不是 merge
子句中,如下所示:
import io.delta.tables.DeltaTable
val dfUniqueException = DeltaTable.forPath(outputFolder)
val dfNewExceptionLabeled = dfNewExceptions.as("new")
dfUniqueException.as("existing")
.merge(dfNewExceptionLabeled, "new.ExceptionId = existing.ExceptionId")
.whenMatched("new.LastUpdateTime > date_add(existing.LastUpdateTime, 14)")
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
如果您应用此代码但存在以下现有例外情况:
+---------------+-------------------+--------+
|ExceptionId |LastUpdateTime |Message |
+---------------+-------------------+--------+
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_02|2021-03-10 00:00:00|value_02|
|exception_id_03|2021-03-10 00:00:00|value_03|
+---------------+-------------------+--------+
以及以下新的例外情况:
+---------------+-------------------+--------+
|ExceptionId |LastUpdateTime |Message |
+---------------+-------------------+--------+
|exception_id_02|2021-03-20 00:00:00|value_04|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_04|2021-03-31 00:00:00|value_06|
+---------------+-------------------+--------+
你在 delta table 中的最终结果是:
+---------------+-------------------+--------+
|ExceptionId |LastUpdateTime |Message |
+---------------+-------------------+--------+
|exception_id_04|2021-03-31 00:00:00|value_06|
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_02|2021-03-10 00:00:00|value_02|
+---------------+-------------------+--------+
我有一个案例class如下:
case class UniqueException(
ExceptionId:String,
LastUpdateTime:Timestamp,
IsDisplayed:Boolean,
Message:String,
ExceptionType:String,
ExceptionMessage:String,
FullException:String
)
这用于生成增量 table。
需要满足以下条件:
- 如果 UniqueException 的 ExceptionId 是增量新的,则需要插入一个新的 UniqueException table。
- 如果传入 UniqueException 的 ExceptionId 已经存在于增量中 table 并且传入 UniqueException 的 LastUpdateTime 大于 14 天,则需要更新现有的 UniqueException。
- 如果 LastUpdateTime 小于 14 天,如果传入的 UniqueException 的 ExceptionId 已经存在于增量中,则不应更新传入的 UniqueException table。
我写了下面的代码,但是不满足上面的情况
val dfUniqueException = DeltaTable.forPath(outputFolder)
dfUniqueException.as("existing")
.merge(dfNewExceptions.as("new"), "new.ExceptionId = existing.ExceptionId and new.LastUpdateTime > date_add(existing.LastUpdateTime, 14")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
知道如何使用单个合并语句满足上述条件吗?
其实你的规则可以重写如下:
- if存在异常
- 如果
LastUpdateTime
现有的和新的相差超过14天,更新现有的 - else什么也不做
- 如果
- else 插入新异常
因此您可以更改您的代码,将“14 天规则”放在 whenMatched
子句中而不是 merge
子句中,如下所示:
import io.delta.tables.DeltaTable
val dfUniqueException = DeltaTable.forPath(outputFolder)
val dfNewExceptionLabeled = dfNewExceptions.as("new")
dfUniqueException.as("existing")
.merge(dfNewExceptionLabeled, "new.ExceptionId = existing.ExceptionId")
.whenMatched("new.LastUpdateTime > date_add(existing.LastUpdateTime, 14)")
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
如果您应用此代码但存在以下现有例外情况:
+---------------+-------------------+--------+
|ExceptionId |LastUpdateTime |Message |
+---------------+-------------------+--------+
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_02|2021-03-10 00:00:00|value_02|
|exception_id_03|2021-03-10 00:00:00|value_03|
+---------------+-------------------+--------+
以及以下新的例外情况:
+---------------+-------------------+--------+
|ExceptionId |LastUpdateTime |Message |
+---------------+-------------------+--------+
|exception_id_02|2021-03-20 00:00:00|value_04|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_04|2021-03-31 00:00:00|value_06|
+---------------+-------------------+--------+
你在 delta table 中的最终结果是:
+---------------+-------------------+--------+
|ExceptionId |LastUpdateTime |Message |
+---------------+-------------------+--------+
|exception_id_04|2021-03-31 00:00:00|value_06|
|exception_id_01|2021-03-10 00:00:00|value_01|
|exception_id_03|2021-03-31 00:00:00|value_05|
|exception_id_02|2021-03-10 00:00:00|value_02|
+---------------+-------------------+--------+