Delta Lake:upsert 在内部是如何工作的?
Delta Lake : How does upsert internally work?
在我们的数据管道中,我们从数据源提取 CDC 事件并将这些更改以 AVRO 格式写入 "incremental data" 文件夹。
然后,我们定期 运行 Spark 作业将此 "incremental data" 与我们当前版本的 "snapshot table"(ORC 格式)合并,以获得最新版本的上游快照。
在此合并逻辑期间:
1) 我们将 "incremental data" 作为 DataFrame df1
加载
2) 将当前 "snapshot table" 加载为 DataFrame df2
3) 合并 df1 和 df2 删除重复的 id 并获取最新版本的行(使用 update_timestamp 列)
此逻辑将 "incremental data" 和当前 "snapshot table" 的全部数据加载到 Spark 内存中,根据数据库的不同,该内存可能非常大。
我注意到在 Delta Lake 中,类似的操作是使用以下代码完成的:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
在这里,"updatesDF" 可以被认为是我们的 "incremental data",它来自 CDC 来源。
我的问题:
1) merge/upsert 内部是如何工作的?它是否将整个 "updatedDF" 和“/data/events/”加载到 Spark 内存中?
2) 如果不是,它是否应用类似于 Apache Hudi 的增量更改?
3) 在重复数据删除期间,此更新插入逻辑如何知道获取最新版本的记录?因为我没有看到任何设置来指定 "update timestamp" 列?
1) How does merge/upsert internally works? Does it load entire "updatedDF" and
"/data/events/" into Spark memory?
不,Spark 不需要将它需要更新的整个 Delta DF 加载到内存中。
否则它将无法扩展。
它采用的方法与 Spark 所做的其他工作非常相似——如果数据集足够大(或者您云创建显式分区),整个 table 会透明地分成多个分区。然后为每个分区分配一个任务,该任务构成您的 merge
作业。任务可以 运行 在不同的 Spark 执行器等
2) If not, does it apply the delta changes something similar to Apache Hudi ?
我听说过 Apache Hudi,但没有看过。
在内部,Delta
看起来像版本化的镶木地板文件。
对 table 的更改存储为有序的原子单元,称为提交。
当你保存一个 table - 看看它有什么文件 - 它会有文件
像 000000.json、000001.json 等,它们中的每一个都会引用一个
子目录中底层镶木地板文件的一组操作。例如,
000000.json 会说这个版本及时引用镶木地板文件 001
and 002, and 000001.json 会及时说这个版本不应该引用
那两个旧的镶木地板文件,只使用镶木地板文件 003。
3) During deduplication how this upsert logic knows to take the latest version of a record?
Because I don't see any setting to specify the "update timestamp" column?
默认情况下它引用最新的变更集。
时间戳是内部版本控制在 Delta 中的实现方式。
您可以通过 AS OF
语法引用旧快照 - 请参阅
https://docs.databricks.com/delta/delta-batch.html#syntax
在我们的数据管道中,我们从数据源提取 CDC 事件并将这些更改以 AVRO 格式写入 "incremental data" 文件夹。
然后,我们定期 运行 Spark 作业将此 "incremental data" 与我们当前版本的 "snapshot table"(ORC 格式)合并,以获得最新版本的上游快照。
在此合并逻辑期间:
1) 我们将 "incremental data" 作为 DataFrame df1
加载2) 将当前 "snapshot table" 加载为 DataFrame df2
3) 合并 df1 和 df2 删除重复的 id 并获取最新版本的行(使用 update_timestamp 列)
此逻辑将 "incremental data" 和当前 "snapshot table" 的全部数据加载到 Spark 内存中,根据数据库的不同,该内存可能非常大。
我注意到在 Delta Lake 中,类似的操作是使用以下代码完成的:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
在这里,"updatesDF" 可以被认为是我们的 "incremental data",它来自 CDC 来源。
我的问题:
1) merge/upsert 内部是如何工作的?它是否将整个 "updatedDF" 和“/data/events/”加载到 Spark 内存中?
2) 如果不是,它是否应用类似于 Apache Hudi 的增量更改?
3) 在重复数据删除期间,此更新插入逻辑如何知道获取最新版本的记录?因为我没有看到任何设置来指定 "update timestamp" 列?
1) How does merge/upsert internally works? Does it load entire "updatedDF" and
"/data/events/" into Spark memory?
不,Spark 不需要将它需要更新的整个 Delta DF 加载到内存中。
否则它将无法扩展。
它采用的方法与 Spark 所做的其他工作非常相似——如果数据集足够大(或者您云创建显式分区),整个 table 会透明地分成多个分区。然后为每个分区分配一个任务,该任务构成您的 merge
作业。任务可以 运行 在不同的 Spark 执行器等
2) If not, does it apply the delta changes something similar to Apache Hudi ?
我听说过 Apache Hudi,但没有看过。
在内部,Delta
看起来像版本化的镶木地板文件。
对 table 的更改存储为有序的原子单元,称为提交。
当你保存一个 table - 看看它有什么文件 - 它会有文件
像 000000.json、000001.json 等,它们中的每一个都会引用一个
子目录中底层镶木地板文件的一组操作。例如,
000000.json 会说这个版本及时引用镶木地板文件 001
and 002, and 000001.json 会及时说这个版本不应该引用
那两个旧的镶木地板文件,只使用镶木地板文件 003。
3) During deduplication how this upsert logic knows to take the latest version of a record?
Because I don't see any setting to specify the "update timestamp" column?
默认情况下它引用最新的变更集。
时间戳是内部版本控制在 Delta 中的实现方式。
您可以通过 AS OF
语法引用旧快照 - 请参阅
https://docs.databricks.com/delta/delta-batch.html#syntax