获取两个版本的 delta lake 之间的区别 table
Get difference between two version of delta lake table
如何找到 Delta Table 最后两个版本之间的区别?
这是我使用数据框的情况:
val df1 = spark.read
.format("delta")
.option("versionAsOf", "0001")
.load("/path/to/my/table")
val df2 = spark.read
.format("delta")
.option("versionAsOf", "0002")
.load("/path/to/my/table")
// non idiomatic way to do it ...
df1.unionAll(df2).except(df1.intersect(df2))
有 Delta 的商业版本 by Databricks that provides a solution called CDF 但我正在寻找开源替代品
这个return一个数据框跟
比较
import uk.co.gresearch.spark.diff.DatasetDiff
df1.diff(df2)
使用@Zinking 的评论,我设法得到了一个 Dataframe,其中计算了两个版本之间的差异:
1) 获取最新版本:
val lastVersion = DeltaTable.forPath(spark, PATH_TO_DELTA_TABLE)
.history()
.select(col("version"))
.collect.toList
.headOption
.getOrElse(throw new Exception("Is this table empty ?"))
2) 获取特定版本中标记为“添加”或“删除”的镶木地板文件列表 0000NUMVERSION
:
val addPathList = spark
.read
.json(s"ROOT_PATH/_delta_log/0000NUMVERSION.json")
.where(s"add is not null")
.select(s"add.path")
.collect()
.map(path => formatPath(path.toString))
.toList
val removePathList = spark
.read
.json(s"ROOT_PATH/_delta_log/0000NUMVERSION.json")
.where(s"remove is not null")
.select(s"remove.path")
.collect()
.map(path => formatPath(path.toString))
.toList
3) 将它们加载到数据帧中
import org.apache.spark.sql.functions._
val addDF = spark
.read
.format("parquet")
.load(addPathList: _*)
.withColumn("add_remove", lit("add"))
val removeDF = spark
.read
.format("parquet")
.load(removePathList: _*)
.withColumn("add_remove", lit("remove"))
4) 两个数据帧的联合表示“差异”:
addDF.union(removeDF).show()
+----------+----------+
|updatedate|add_remove|
+----------+----------+
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
+----------+----------+
only showing top 20 rows
如何找到 Delta Table 最后两个版本之间的区别? 这是我使用数据框的情况:
val df1 = spark.read
.format("delta")
.option("versionAsOf", "0001")
.load("/path/to/my/table")
val df2 = spark.read
.format("delta")
.option("versionAsOf", "0002")
.load("/path/to/my/table")
// non idiomatic way to do it ...
df1.unionAll(df2).except(df1.intersect(df2))
有 Delta 的商业版本 by Databricks that provides a solution called CDF 但我正在寻找开源替代品
这个return一个数据框跟
比较import uk.co.gresearch.spark.diff.DatasetDiff
df1.diff(df2)
使用@Zinking 的评论,我设法得到了一个 Dataframe,其中计算了两个版本之间的差异:
1) 获取最新版本:
val lastVersion = DeltaTable.forPath(spark, PATH_TO_DELTA_TABLE)
.history()
.select(col("version"))
.collect.toList
.headOption
.getOrElse(throw new Exception("Is this table empty ?"))
2) 获取特定版本中标记为“添加”或“删除”的镶木地板文件列表 0000NUMVERSION
:
val addPathList = spark
.read
.json(s"ROOT_PATH/_delta_log/0000NUMVERSION.json")
.where(s"add is not null")
.select(s"add.path")
.collect()
.map(path => formatPath(path.toString))
.toList
val removePathList = spark
.read
.json(s"ROOT_PATH/_delta_log/0000NUMVERSION.json")
.where(s"remove is not null")
.select(s"remove.path")
.collect()
.map(path => formatPath(path.toString))
.toList
3) 将它们加载到数据帧中
import org.apache.spark.sql.functions._
val addDF = spark
.read
.format("parquet")
.load(addPathList: _*)
.withColumn("add_remove", lit("add"))
val removeDF = spark
.read
.format("parquet")
.load(removePathList: _*)
.withColumn("add_remove", lit("remove"))
4) 两个数据帧的联合表示“差异”:
addDF.union(removeDF).show()
+----------+----------+
|updatedate|add_remove|
+----------+----------+
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
| null| add|
+----------+----------+
only showing top 20 rows