三角洲湖回滚

Delta Lake rollback

需要一种优雅的方式将 Delta Lake 回滚到以前的版本。

我目前的做法如下:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, testFolder)

spark.read.format("delta")
  .option("versionAsOf", 0)
  .load(testFolder)
  .write
  .mode("overwrite")
  .format("delta")
  .save(testFolder)

虽然这很难看,因为整个数据集都需要重写。似乎一些元更新就足够了,不需要数据 I/O。有人知道更好的方法吗?

这是一个残酷的解决方案。这并不理想,但考虑到用分区覆盖大型数据集可能代价高昂,这个简单的解决方案可能会有所帮助。

如果您对回滚时间后的更新不是很敏感,只需删除 _delta_log 中晚于回滚时间的所有版本文件。以后可以使用 vacuum 释放未引用的文件。

另一种保留完整历史记录的解决方案是 1) deltaTable.delete 2) 将所有日志按顺序(随着版本号增加)复制到回滚到删除日志文件的末尾。这模拟了到回滚日期为止的三角洲湖的创建。不过肯定不好看

你应该使用时间旅行功能:https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

您在时间戳处读取数据:

val inputPath = "/path/to/my/table@20190101000000000"

然后用"rolled back"版本覆盖现有数据。

关于它的丑陋,我不确定我能帮上忙。您可以使用分区限制数据。或者您可以计算出哪些记录已更改并只覆盖它们。

如果您的目标是修复错误的数据并且您对更新不是很敏感,您可以更换一个时间间隔。

 df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
      .save("/delta/events")

我在使用 Delta 时遇到过类似的问题,我一直在 1 个事务中调用多个 dml 操作。例如我有调用合并然后在 1 个事务中删除的要求。因此,在这种情况下,要么它们都必须一起成功,要么在其中任何一个失败时回滚状态。

为了解决这个问题,我在交易开始前备份了_delta_log(我们称之为稳定状态)目录。如果事务中的两个 DML 操作都成功,则丢弃先前的稳定状态并使用在 _delta_log 中提交的新状态,以防万一任何一个 dml 操作失败则只需将 _delta_log 目录替换为您在开始交易之前进行备份的稳定状态。一旦替换为稳定状态,则只需 运行 真空即可删除可能在事务期间创建的陈旧文件。

从 Delta Lake 0.7.0 开始,您可以使用 RESTORE command 回滚到 Delta Lake table 的早期版本。这是使用时间旅行回滚 table 的更简单方法。

Scala:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

Python:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

SQL:

RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0

如果您更喜欢那样做,也可以使用 restoreToTimestamp 命令。 Read the documentation了解更多详情。