Databricks 中的显式 table 分区如何影响写入性能?

How do explicit table partitions in Databricks affect write performance?

我们有以下场景:

因此,在创建新的 table 时,我们 运行 查询如下:

CREATE TABLE the_new_table
USING DELTA
PARTITIONED BY (entity_id, date)
AS SELECT
  entity_id,
  another_id,
  from_unixtime(timestamp) AS timestamp,
  CAST(from_unixtime(timestamp) AS DATE) AS date
FROM the_old_table

此查询有 运行 48 小时,并且还在增加。我们知道它正在取得进展,因为我们在相关的 S3 前缀中找到了大约 250k 个对应于第一个分区键的前缀,并且存在的前缀中肯定有一些大文件。

但是,我们在准确监控取得了多少进展以及预计需要多长时间方面遇到了一些困难。

在我们等待的同时,我们尝试了这样的查询:

CREATE TABLE a_test_table (
  entity_id STRING,
  another_id STRING,
  timestamp TIMESTAMP,
  date DATE
)
USING DELTA
PARTITIONED BY (date);

INSERT INTO a_test_table
SELECT
  entity_id,
  another_id,
  from_unixtime(timestamp) AS timestamp,
  CAST(from_unixtime(timestamp) AS DATE) AS date
FROM the_old_table
  WHERE CAST(from_unixtime(timestamp) AS DATE) = '2018-12-01'

请注意这里新的 table 模式的主要区别在于我们仅根据日期而不是实体 ID 进行分区。我们选择的日期几乎正好包含旧 table 数据的百分之四,我想指出这一点,因为它远远超过 1/31。当

使用相同数量的工作节点,创建此测试 table 花费了 16 分钟,因此我们预计(基于此)创建 table 25 倍大的 table 只会大约需要 7 个小时.

This answer appears to partially acknowledge that using too many partitions can cause the problem, but the underlying causes appear to have greatly changed in the last couple of years, so we seek to understand what the current issues might be; the Databricks docs 没有特别有启发性。

根据发布的 request rate guidelines for S3,似乎增加分区(键前缀)的数量应该 提高 性能。有害的分区似乎违反直觉。

总而言之:我们期望将数千条记录写入数千个分区中的每一个。似乎减少分区数量会显着减少写入 table 数据所需的时间。为什么这是真的?对于应该为特定大小的数据创建的分区数量,是否有任何一般准则?

我根本不是数据块专家,但希望这些要点可以提供帮助

分区数

无论如何,创建的分区和文件的数量都会影响您的作业性能,尤其是使用 s3 作为数据存储时,但是这个数量的文件应该可以由下降大小的集群轻松处理

动态分区

通过 2 个键而不是一个键动态分区之间存在巨大差异,让我尝试更详细地解决这个问题。

当您对数据进行动态分区时,根据任务数量和数据大小,可能会创建大量小文件每个分区,这可能(并且可能会)影响需要使用此数据的下一个作业的性能,特别是如果您的数据存储在 ORC、parquet 或任何其他柱状格式中。请注意,这将只需要一个 map only job

之前解释的问题以不同的方式解决,是最常见的文件合并。为此,数据被重新分区以创建更大的文件。因此,需要对数据进行混洗。

您的查询

对于您的第一个查询,分区数将为 350k*31(大约 11MM!),考虑到处理该作业所需的改组和任务量,这确实很大。

对于您的第二个查询(仅需 16 分钟),所需的任务数和改组所需的数量要少得多。

分区数 (shuffling/sorting/tasks scheduling/etc) 和您的作业执行时间没有线性关系,这就是为什么在这种情况下数学不相加的原因。

推荐

我想你已经明白了,你应该将你的 etl 作业分成 31 个不同的查询,这样可以优化执行时间

您应该按 date 对您的数据进行分区,因为这听起来像是您随着时间的推移不断添加数据。这是划分时间序列数据的普遍接受的方法。这意味着您将每天写入一个日期分区,并且您之前的日期分区不会再次更新(一件好事)。

如果您的用例从中受益,您当然可以使用辅助分区键(即 PARTITIONED BY (date, entity_id)

按日期分区将需要您始终按日期读取此数据,以获得最佳性能。如果这不是您的用例,那么您必须澄清您的问题。

多少分区?

没有人能告诉您应该使用多少个分区,因为每个数据集(和处理集群)都是不同的。您确实要避免的是 "data skew",其中一名工作人员必须处理大量数据,而其他工作人员则处于空闲状态。例如,在您的情况下,如果一个 clientid 占数据集的 20%,就会发生这种情况。按日期分区必须假设每天的数据量大致相同,因此每个工作人员都保持同样的忙碌。

我不具体了解 Databricks 如何写入磁盘,但在 Hadoop 上我希望看​​到每个工作节点写入它自己的文件部分,因此您的写入性能在这个级别是并行的。

在占用分区列的情况下我的建议是

  • 确定所有列的基数,select那些时间有限的列,因此排除标识符和日期列
  • 确定对 table 的主要搜索,可能是日期或某些分类字段
  • 生成具有有限基数的子列以加快搜索示例在日期的情况下可以将其分解为年月日等,或者在整数标识符的情况下分解将它们分成这些ID的整数除法% [1,2,3 ...]

正如我之前提到的,使用基数高的列进行分区会导致性能不佳,因为会生成大量文件,这是最糟糕的工作情况。

创建增量时建议使用不超过 1 GB 的文件 table 建议占用“合并 (1)”

如果需要进行更新或插入,指定最大分区列数,排除文件读取的inceserary情况,减少次数很有效