三角洲湖回滚
Delta Lake rollback
需要一种优雅的方式将 Delta Lake 回滚到以前的版本。
我目前的做法如下:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, testFolder)
spark.read.format("delta")
.option("versionAsOf", 0)
.load(testFolder)
.write
.mode("overwrite")
.format("delta")
.save(testFolder)
虽然这很难看,因为整个数据集都需要重写。似乎一些元更新就足够了,不需要数据 I/O。有人知道更好的方法吗?
这是一个残酷的解决方案。这并不理想,但考虑到用分区覆盖大型数据集可能代价高昂,这个简单的解决方案可能会有所帮助。
如果您对回滚时间后的更新不是很敏感,只需删除 _delta_log 中晚于回滚时间的所有版本文件。以后可以使用 vacuum 释放未引用的文件。
另一种保留完整历史记录的解决方案是 1) deltaTable.delete
2) 将所有日志按顺序(随着版本号增加)复制到回滚到删除日志文件的末尾。这模拟了到回滚日期为止的三角洲湖的创建。不过肯定不好看
你应该使用时间旅行功能:https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
您在时间戳处读取数据:
val inputPath = "/path/to/my/table@20190101000000000"
然后用"rolled back"版本覆盖现有数据。
关于它的丑陋,我不确定我能帮上忙。您可以使用分区限制数据。或者您可以计算出哪些记录已更改并只覆盖它们。
如果您的目标是修复错误的数据并且您对更新不是很敏感,您可以更换一个时间间隔。
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/delta/events")
我在使用 Delta 时遇到过类似的问题,我一直在 1 个事务中调用多个 dml 操作。例如我有调用合并然后在 1 个事务中删除的要求。因此,在这种情况下,要么它们都必须一起成功,要么在其中任何一个失败时回滚状态。
为了解决这个问题,我在交易开始前备份了_delta_log(我们称之为稳定状态)目录。如果事务中的两个 DML 操作都成功,则丢弃先前的稳定状态并使用在 _delta_log 中提交的新状态,以防万一任何一个 dml 操作失败则只需将 _delta_log 目录替换为您在开始交易之前进行备份的稳定状态。一旦替换为稳定状态,则只需 运行 真空即可删除可能在事务期间创建的陈旧文件。
从 Delta Lake 0.7.0 开始,您可以使用 RESTORE command 回滚到 Delta Lake table 的早期版本。这是使用时间旅行回滚 table 的更简单方法。
Scala:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.restoreToVersion(0)
Python:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.restoreToVersion(0)
SQL:
RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0
如果您更喜欢那样做,也可以使用 restoreToTimestamp
命令。 Read the documentation了解更多详情。
需要一种优雅的方式将 Delta Lake 回滚到以前的版本。
我目前的做法如下:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, testFolder)
spark.read.format("delta")
.option("versionAsOf", 0)
.load(testFolder)
.write
.mode("overwrite")
.format("delta")
.save(testFolder)
虽然这很难看,因为整个数据集都需要重写。似乎一些元更新就足够了,不需要数据 I/O。有人知道更好的方法吗?
这是一个残酷的解决方案。这并不理想,但考虑到用分区覆盖大型数据集可能代价高昂,这个简单的解决方案可能会有所帮助。
如果您对回滚时间后的更新不是很敏感,只需删除 _delta_log 中晚于回滚时间的所有版本文件。以后可以使用 vacuum 释放未引用的文件。
另一种保留完整历史记录的解决方案是 1) deltaTable.delete
2) 将所有日志按顺序(随着版本号增加)复制到回滚到删除日志文件的末尾。这模拟了到回滚日期为止的三角洲湖的创建。不过肯定不好看
你应该使用时间旅行功能:https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
您在时间戳处读取数据:
val inputPath = "/path/to/my/table@20190101000000000"
然后用"rolled back"版本覆盖现有数据。
关于它的丑陋,我不确定我能帮上忙。您可以使用分区限制数据。或者您可以计算出哪些记录已更改并只覆盖它们。
如果您的目标是修复错误的数据并且您对更新不是很敏感,您可以更换一个时间间隔。
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/delta/events")
我在使用 Delta 时遇到过类似的问题,我一直在 1 个事务中调用多个 dml 操作。例如我有调用合并然后在 1 个事务中删除的要求。因此,在这种情况下,要么它们都必须一起成功,要么在其中任何一个失败时回滚状态。
为了解决这个问题,我在交易开始前备份了_delta_log(我们称之为稳定状态)目录。如果事务中的两个 DML 操作都成功,则丢弃先前的稳定状态并使用在 _delta_log 中提交的新状态,以防万一任何一个 dml 操作失败则只需将 _delta_log 目录替换为您在开始交易之前进行备份的稳定状态。一旦替换为稳定状态,则只需 运行 真空即可删除可能在事务期间创建的陈旧文件。
从 Delta Lake 0.7.0 开始,您可以使用 RESTORE command 回滚到 Delta Lake table 的早期版本。这是使用时间旅行回滚 table 的更简单方法。
Scala:
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.restoreToVersion(0)
Python:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.restoreToVersion(0)
SQL:
RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0
如果您更喜欢那样做,也可以使用 restoreToTimestamp
命令。 Read the documentation了解更多详情。