使用聚合函数时减少 Athena 扫描的数据量

reduce the amount of data scanned by Athena when using aggregate functions

下面的查询扫描了 100 MB 的数据。

select * from table where column1 = 'val' and partition_id = '20190309';

但是下面的查询扫描了 15 GB 的数据(有超过 90 个分区)

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

如何优化第二个查询以扫描与第一个查询相同数量的数据?

这里有两个问题。上面的标量子查询的效率 select max(partition_id) from table,以及 @PiotrFindeisen 指出的关于动态过滤的效率。

第一个问题是对 Hive table 的分区键的查询比看起来要复杂得多。大多数人会认为,如果你想要分区键的最大值,你可以简单地对分区键执行查询,但这是行不通的,因为 Hive 允许分区为空(并且它还允许非空文件不包含任何行)。具体来说,select max(partition_id) from table 上面的标量子查询要求 Trino (formerly PrestoSQL) 找到包含至少一行的最大分区。理想的解决方案是在 Hive 中拥有完美的统计信息,但除此之外,引擎需要为 Hive 提供自定义逻辑,以打开分区文件,直到找到非空文件。

如果您确定您的仓库不包含空分区(或者如果您同意其含义),您可以将标量子查询替换为隐藏的 $partitions table"

select * 
from table 
where column1 = 'val' and 
    partition_id = (select max(partition_id) from "table$partitions");

第二个问题是@PiotrFindeisen 指出的,与计划和执行查询的方式有关。大多数人会看到上面的查询,看到引擎显然应该在计划期间计算出 select max(partition_id) from "table$partitions" 的值,将其内联到计划中,然后继续优化。不幸的是,一般来说这是一个相当复杂的决定,因此引擎只是将其建模为广播连接,其中执行的一部分计算出该值,并将该值广播给其他工作人员。问题是执行的其余部分无法将此新信息添加到现有处理中,因此它只是扫描所有数据,然后过滤掉您试图跳过的值。添加此 dynamic filtering 的项目正在进行中,但尚未完成。

这意味着您今天能做的最好的事情是 运行 两个单独的查询:一个获取最大值 partition_id,第二个获取内联值。

顺便说一句,隐藏的“$partitions”table 已添加到 Presto 0.199, and we fixed some minor bugs in 0.201. I'm not sure which version Athena is based on, but I believe it is is pretty far out of date (the current release at the time I'm writing this answer is 309

编辑:Presto 删除了 0.193 release 中的 __internal_partitions__ table 所以我建议不要使用 Slow aggregation queries for partition keys 以下部分在任何生产系统中,因为 Athena 'transparently' 更新了 presto 版本。我最终只使用了天真的 SELECT max(partition_date) ... 查询,但也使用了 Lack of Dynamic Filtering 部分中概述的相同回溯技巧。它比使用 __internal_partitions__ table 慢大约 3 倍,但至少当 Athena 决定更新他们的 presto 版本时它不会中断。

-----原创Post-----

因此,当您只需要回顾几个分区的数据以匹配然而,请注意,我不是 100% 确定 information_schema.__internal_partitions__ table 的使用有多脆弱。

正如@Dain 上面提到的,确实有两个问题。第一个是 max(partition_date) 查询的聚合有多慢,第二个是 Presto 缺乏对动态过滤的支持。

分区键聚合查询缓慢

为了解决第一个问题,我使用了 information_schema.__internal_partitions__ table,它允许我在 table 的分区上快速聚合,而无需扫描文件内的数据. (请注意,以下查询中的 partition_valuepartition_keypartition_number 都是 __internal_partitions__ table 的列名称,与您的 table 的列)

如果您的 table 只有一个分区键,您可以这样做:

SELECT max(partition_value) FROM information_schema.__internal_partitions__
WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'

但是如果您有多个分区键,您将需要更多像这样的东西:

SELECT max(partition_date) as latest_partition_date from (
  SELECT max(case when partition_key = 'partition_date' then partition_value end) as partition_date, max(case when partition_key = 'another_partition_key' then partition_value end) as another_partition_key
  FROM information_schema.__internal_partitions__
  WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'
  GROUP BY partition_number
)
WHERE
  -- ... Filter down by values for e.g. another_partition_key
)

这些查询应该 运行 相当快(我的 运行 在大约 1-2 秒内),而无需扫描文件中的实际数据,但同样,我不确定是否有使用这种方法的任何问题。

缺少动态过滤

我能够减轻第二个问题对我的特定 use-case 的最坏影响,因为我希望在从当前日期开始的有限时间内总是有一个分区(例如,我可以保证任何 data-production 或 partition-loading 问题将在 3 天内得到解决)。事实证明,Athena 在使用 presto 的 datetime functions 时确实做了一些 pre-processing,因此这与使用 sub-query.

的动态过滤没有相同类型的问题

因此,您可以更改查询以使用日期时间函数限制实际最大值的回溯范围,从而限制扫描的数据量。

SELECT * FROM "DATABASE_NAME"."TABLE_NAME"
WHERE partition_date >= cast(date '2019-06-25' - interval '3' day as varchar) -- Will only scan partitions from 3 days before '2019-06-25'
AND partition_date = (
  -- Insert the partition aggregation query from above here
)

我不知道它是否仍然相关,但刚刚发现:

而不是:

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

使用:

select a.* from table a 
inner join (select max(partition_id) max_id from table) b on a.partition_id=b.max_id
where column1 = 'val';

我认为这与使用分区的连接优化有关。