处理大量分区的 upsert 不够快
Processing upserts on a large number of partitions is not fast enough
问题
我们在 ADLS Gen2 之上有一个 Delta Lake 设置,具有以下 tables:
bronze.DeviceData
:按到达日期划分 (Partition_Date
)
silver.DeviceData
:按事件日期和时间划分(Partition_Date
和 Partition_Hour
)
我们从事件中心提取大量数据(每天超过 6 亿条记录)到 bronze.DeviceData
(仅追加)。然后,我们以流方式处理新文件,并使用 delta MERGE 命令(见下文)将它们插入 silver.DeviceData
。
到达青铜 table 的数据可以包含来自 任何 白银分区的数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的数据中 >90% 来自分区 Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)
。因此,要更新数据,我们有以下两个 spark 作业:
- “快速”:处理上述三个日期分区的数据。延迟在这里很重要,所以我们优先考虑这些数据
- “慢”:处理其余部分(任何 但 这三个日期分区)。延迟并不重要,但它应该在“合理”的时间内(我会说不超过一周)
现在我们来解决这个问题:虽然“慢”作业的数据量少了很多,但它运行几天只是为了处理一天的慢铜数据,而且有一个大集群。原因很简单:它必须读取和更新许多银分区(有时> 1000 个日期分区),并且由于更新很小但日期分区可能是千兆字节,所以这些合并命令效率低下。
此外,随着时间的推移,这个缓慢的工作将变得越来越慢,因为它接触的银分区会增长。
问题
- 我们的分区方案和 fast/slow Spark 作业设置通常是解决此问题的好方法吗?
- 可以做些什么来改进这个设置?我们想降低慢作业的成本和延迟,并找到一种方法,使它随着任何一天到达的数据量的增长而增长,而不是随着白银的大小 table
附加信息
- 我们需要 MERGE 命令,因为某些上游服务可以重新处理历史数据,然后应该更新银 table
- 白银的架构table:
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the ID of the device that sent the data
DataType STRING NOT NULL, -- the type of data it sent
Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
Value DOUBLE NOT NULL, -- the value that the device sent
UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
- 我们的 MERGE 命令:
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
val batch = ... // the streaming update batch
// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."
val mergeCondition = s"""
silver.Partition_Date IN ($dates)
AND silver.Partition_Hour IN ($hours)
AND silver.Partition_Date = batch.Partition_Date
AND silver.Partition_Hour = batch.Partition_Hour
AND silver.DeviceID = batch.DeviceID
AND silver.Timestamp = batch.Timestamp
AND silver.DataType = batch.DataType
"""
silverTable.alias("silver")
.merge(batch.alias("batch"), mergeCondition)
// only merge if the event is newer
.whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
.whenNotMatched.insertAll
.execute
在 Databricks 上,有几种方法可以优化 merge into
操作的性能:
- 对属于连接条件的列执行 ZOrder 优化。这可能取决于特定的 DBR 版本,因为旧版本(7.6 IIRC 之前)使用真正的 ZOrder 算法,该算法适用于较少数量的列,而 DBR 7.6+ 默认使用 Hilbert space-填充曲线
- 使用较小的文件大小 - 默认情况下,
OPTIMIZE
创建 1Gb 的文件,需要重写。您可以使用 spark.databricks.delta.optimize.maxFileSize
将文件大小设置为 32Mb-64Mb 范围,这样它会重写更少的数据
- 在 table 的分区上使用条件(您已经这样做了)
- 不要使用自动压缩,因为它不能执行 ZOrder,而是 运行 使用 ZOrder 进行显式优化。详情见documentation
- 调整 indexing of the columns,因此它将仅索引您的条件和查询所需的列。它与合并部分相关,但可以略微提高写入速度,因为不会为不用于查询的列收集统计信息。
这篇 presentation from Spark Summit 讨论了 merge into
的优化 - 要观察的指标等
我不能 100% 确定您需要条件 silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours)
,因为如果传入数据中没有特定分区,您可能会读取比所需更多的数据,但需要查看执行计划。 knowledge base article 解释了如何确保 merge into
使用分区 p运行ing。
更新,2021 年 12 月:在较新的 DBR 版本 (DBR 9+) 中,有一个名为 Low Shuffle Merge 的新功能可以防止未修改数据的混洗,因此合并发生得更快。可以通过将 spark.databricks.delta.merge.enableLowShuffle
设置为 true
.
来启用它
问题
我们在 ADLS Gen2 之上有一个 Delta Lake 设置,具有以下 tables:
bronze.DeviceData
:按到达日期划分 (Partition_Date
)silver.DeviceData
:按事件日期和时间划分(Partition_Date
和Partition_Hour
)
我们从事件中心提取大量数据(每天超过 6 亿条记录)到 bronze.DeviceData
(仅追加)。然后,我们以流方式处理新文件,并使用 delta MERGE 命令(见下文)将它们插入 silver.DeviceData
。
到达青铜 table 的数据可以包含来自 任何 白银分区的数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的数据中 >90% 来自分区 Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)
。因此,要更新数据,我们有以下两个 spark 作业:
- “快速”:处理上述三个日期分区的数据。延迟在这里很重要,所以我们优先考虑这些数据
- “慢”:处理其余部分(任何 但 这三个日期分区)。延迟并不重要,但它应该在“合理”的时间内(我会说不超过一周)
现在我们来解决这个问题:虽然“慢”作业的数据量少了很多,但它运行几天只是为了处理一天的慢铜数据,而且有一个大集群。原因很简单:它必须读取和更新许多银分区(有时> 1000 个日期分区),并且由于更新很小但日期分区可能是千兆字节,所以这些合并命令效率低下。
此外,随着时间的推移,这个缓慢的工作将变得越来越慢,因为它接触的银分区会增长。
问题
- 我们的分区方案和 fast/slow Spark 作业设置通常是解决此问题的好方法吗?
- 可以做些什么来改进这个设置?我们想降低慢作业的成本和延迟,并找到一种方法,使它随着任何一天到达的数据量的增长而增长,而不是随着白银的大小 table
附加信息
- 我们需要 MERGE 命令,因为某些上游服务可以重新处理历史数据,然后应该更新银 table
- 白银的架构table:
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the ID of the device that sent the data
DataType STRING NOT NULL, -- the type of data it sent
Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
Value DOUBLE NOT NULL, -- the value that the device sent
UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
- 我们的 MERGE 命令:
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
val batch = ... // the streaming update batch
// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."
val mergeCondition = s"""
silver.Partition_Date IN ($dates)
AND silver.Partition_Hour IN ($hours)
AND silver.Partition_Date = batch.Partition_Date
AND silver.Partition_Hour = batch.Partition_Hour
AND silver.DeviceID = batch.DeviceID
AND silver.Timestamp = batch.Timestamp
AND silver.DataType = batch.DataType
"""
silverTable.alias("silver")
.merge(batch.alias("batch"), mergeCondition)
// only merge if the event is newer
.whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
.whenNotMatched.insertAll
.execute
在 Databricks 上,有几种方法可以优化 merge into
操作的性能:
- 对属于连接条件的列执行 ZOrder 优化。这可能取决于特定的 DBR 版本,因为旧版本(7.6 IIRC 之前)使用真正的 ZOrder 算法,该算法适用于较少数量的列,而 DBR 7.6+ 默认使用 Hilbert space-填充曲线
- 使用较小的文件大小 - 默认情况下,
OPTIMIZE
创建 1Gb 的文件,需要重写。您可以使用spark.databricks.delta.optimize.maxFileSize
将文件大小设置为 32Mb-64Mb 范围,这样它会重写更少的数据 - 在 table 的分区上使用条件(您已经这样做了)
- 不要使用自动压缩,因为它不能执行 ZOrder,而是 运行 使用 ZOrder 进行显式优化。详情见documentation
- 调整 indexing of the columns,因此它将仅索引您的条件和查询所需的列。它与合并部分相关,但可以略微提高写入速度,因为不会为不用于查询的列收集统计信息。
这篇 presentation from Spark Summit 讨论了 merge into
的优化 - 要观察的指标等
我不能 100% 确定您需要条件 silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours)
,因为如果传入数据中没有特定分区,您可能会读取比所需更多的数据,但需要查看执行计划。 knowledge base article 解释了如何确保 merge into
使用分区 p运行ing。
更新,2021 年 12 月:在较新的 DBR 版本 (DBR 9+) 中,有一个名为 Low Shuffle Merge 的新功能可以防止未修改数据的混洗,因此合并发生得更快。可以通过将 spark.databricks.delta.merge.enableLowShuffle
设置为 true
.