如何在保留已删除行的同时合并增量数据集和快照数据集?
How can I merge an incremental dataset and a snapshot dataset while retaining deleted rows?
我有一个创建两个数据集的数据连接源:
- 数据集 X(快照)
- 数据集 Y(增量)
两个数据集来自同一来源。数据集 X
包含源 table 中所有行的当前状态。数据集 Y
提取自上次构建以来已更新的所有行。然后将这两个数据集向下游合并到数据集 Z
中,其中数据集 Z
是数据集 X
或数据集 Y
中每一行的最新版本。这使我们既可以进行低延迟更新,又可以保持良好的分区。
当源 table 中的行被删除时,这些行不再存在于数据集 X
中,但仍存在于数据集 Y
.
中
将这些 'deleted' 行保留在数据集 Z
中的最佳方法是什么?理想情况下,我还可以在不丢失任何 'deleted' 行的情况下拍摄数据集 Y
的快照。
好问题!据我了解,您希望数据集 Z
仅包含最新的行,包括最新的已删除行。 Y
中同时存在更新的行和删除的行。在这种情况下,我建议首先将 Y
和 X
联合在一起,以便所有行(包括已删除的行)都出现在联合数据集中。然后,在日期列上使用 window 函数以获得每一行的最新版本。这是我建议的 pyspark 代码的概要:
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy(primary_keys).orderBy(F.col(date_column).desc())
Z = X.unionByName(Y) # union to get all columns, including deleted
Z = Z.withColumn("row_num", F.row_number().over(window)) # rank by date created/updated
Z = Z.filter(F.col("row_num") == 1).drop("row_num") # keep only the latest version of each row
请注意,此解决方案无法解决 Y 快照会发生什么的问题。
我有一个创建两个数据集的数据连接源:
- 数据集 X(快照)
- 数据集 Y(增量)
两个数据集来自同一来源。数据集 X
包含源 table 中所有行的当前状态。数据集 Y
提取自上次构建以来已更新的所有行。然后将这两个数据集向下游合并到数据集 Z
中,其中数据集 Z
是数据集 X
或数据集 Y
中每一行的最新版本。这使我们既可以进行低延迟更新,又可以保持良好的分区。
当源 table 中的行被删除时,这些行不再存在于数据集 X
中,但仍存在于数据集 Y
.
将这些 'deleted' 行保留在数据集 Z
中的最佳方法是什么?理想情况下,我还可以在不丢失任何 'deleted' 行的情况下拍摄数据集 Y
的快照。
好问题!据我了解,您希望数据集 Z
仅包含最新的行,包括最新的已删除行。 Y
中同时存在更新的行和删除的行。在这种情况下,我建议首先将 Y
和 X
联合在一起,以便所有行(包括已删除的行)都出现在联合数据集中。然后,在日期列上使用 window 函数以获得每一行的最新版本。这是我建议的 pyspark 代码的概要:
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy(primary_keys).orderBy(F.col(date_column).desc())
Z = X.unionByName(Y) # union to get all columns, including deleted
Z = Z.withColumn("row_num", F.row_number().over(window)) # rank by date created/updated
Z = Z.filter(F.col("row_num") == 1).drop("row_num") # keep only the latest version of each row
请注意,此解决方案无法解决 Y 快照会发生什么的问题。