如何防止重复条目进入 Azure 存储的增量湖
How to Prevent Duplicate Entries to enter to delta lake of Azure Storage
我有一个以 delta 格式存储到 Adls 中的 Dataframe,现在当我尝试将新的更新行附加到该 delta 湖时,有什么方法可以删除 delta 中的旧现有记录并添加新更新的记录。
Delta 中存储的 DataFrame 架构有一个唯一列。通过它我们可以检查记录是更新的还是新的。
这是 Merge command 的任务 - 您定义合并条件(您的唯一列),然后定义操作。在 SQL 中,它可能如下所示(column
是您的唯一列,updates
可能是您注册为临时视图的数据框):
MERGE INTO destination
USING updates
ON destination.column = updates.column
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
在 Python 中可能如下所示:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/destination/")
deltaTable.alias("dest").merge(
updatesDF.alias("updates"),
"dest.column = updates.column") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
我有一个以 delta 格式存储到 Adls 中的 Dataframe,现在当我尝试将新的更新行附加到该 delta 湖时,有什么方法可以删除 delta 中的旧现有记录并添加新更新的记录。
Delta 中存储的 DataFrame 架构有一个唯一列。通过它我们可以检查记录是更新的还是新的。
这是 Merge command 的任务 - 您定义合并条件(您的唯一列),然后定义操作。在 SQL 中,它可能如下所示(column
是您的唯一列,updates
可能是您注册为临时视图的数据框):
MERGE INTO destination
USING updates
ON destination.column = updates.column
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
在 Python 中可能如下所示:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/destination/")
deltaTable.alias("dest").merge(
updatesDF.alias("updates"),
"dest.column = updates.column") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()