AWS Athena 查询分区

AWS Athena Query Partitioning

我正在尝试使用 AWS Athena 为现有平台提供分析。当前流程如下所示:

  1. 数据作为 JSON 个事件被泵入 Kinesis Firehose。
  2. Firehose 使用 AWS Glue 中的 table 将数据转换为镶木地板,并每 15 分钟或当流达到 128 MB(最大支持值)时写入 S3。
  3. 当数据写入 S3 时,它使用路径 /year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/...
  4. 进行分区
  5. AWS Glue 爬虫每 24 小时使用最新的分区数据更新 table 并使其可用于查询。

基本流程有效。但是,这有几个问题...

首先(也是最重要的)是此数据是多租户应用程序的一部分。每个名为 account_id 的事件中都有一个 属性。将要发出的每个查询都将由特定帐户发出,我不想为每个查询扫描所有帐户数据。我需要找到一种仅查询相关数据的可扩展方式。我确实研究过尝试让 Kinesis 提取 account_id 并将其用作分区。但是,这目前不受支持,如果帐户超过 10,000 个,AWS 20k 分区限制很快就会成为问题。

第二个问题是文件大小! AWS 建议文件不要小于 128 MB,因为这会对查询时间产生不利影响,因为执行引擎可能会花费额外的时间来处理打开 Amazon S3 文件的开销。鉴于 Firehose 的性质,我只能达到每个文件 128 MB 的最大大小。

对于那么多帐户,您可能出于多种原因不想使用 account_id 作为分区键。我认为你在限制方面做得很好,the partition limit per table is 1M,但这并不意味着这是个好主意。

不过,您可以通过对部分帐户 ID 进行分区来显着减少扫描的数据量。如果您的账户 ID 是均匀分布的(如 AWS 账户 ID),您可以根据前缀进行分区。如果您的帐户 ID 是数字,则在第一个数字上进行分区会使每个查询扫描的数据量减少 90%,而使用两位数则减少 99%,同时仍将分区数量保持在非常合理的水平。

不幸的是,我也不知道如何使用 Glue 做到这一点。在进行 ETL 时,我发现 Glue 通常非常无用。根据我的经验,即使是简单的事情也很难。我使用 Athena 的 CTAS 功能结合一些简单的 S3 操作取得了更大的成功,将 CTAS 操作产生的数据添加为现有 table.

中的分区

如果您找到提取帐户 ID 的方法,您还可以对每个帐户单独 table 进行试验,you can have 100K tables in a database。它与 table 中的分区没有太大区别,但可能会更快,具体取决于 Athena 如何确定要查询的分区。

不要太担心 128 MB 文件大小的经验法则。拥有大量小文件绝对比拥有少量大文件更糟糕 - 但扫描大量数据以过滤掉一小部分数据对性能和成本来说也是非常糟糕的。即使查询数百个大小只有几 KB 的文件,Athena 也可以在一秒钟内提供结果。我会担心确保 Athena 首先读取正确的数据,然后才是理想的文件大小。

如果你告诉我更多关于每个帐户的数据量和帐户的预期生命周期,我可以就目标目标提供更详细的建议。


更新:鉴于 Firehose 不允许您更改输入数据的目录结构,并且 Glue 通常非常糟糕,并且您在评论,我会做这样的事情:

  • 创建一个 Athena table,其中包含数据中所有属性的列,并将日期作为分区键。这是您的输入 table,只有 ETL 查询会 运行 反对此 table。不用担心输入数据有单独的年、月、日目录,您只需要一个分区键。将它们作为单独的分区键只会使事情变得复杂,并且有一个意味着它可以是 DATE 类型,而不是三个单独的 STRING 列,您必须 assemble 进入一个日期每次你想做一个日期计算。

  • 创建另一个具有相同列的 Athena table,但按 account_id_prefix 和日期或月份进行分区。这将是您 运行 查询的 table。 account_id_prefix 将是您帐户 ID 中的一两个字符 – 您必须测试最有效的字符。您还必须决定是否按日期或更长的时间跨度进行分区。日期将使 ETL 变得更容易和更便宜,但更长的时间跨度将产生更少和更大的文件,这可以使查询更有效(但可能更昂贵)。

  • 创建执行以下操作的 Step Functions 状态机(在 Lambda 函数中):

    • 向输入添加新分区 table。如果您每天将状态机安排到 运行 一次,它可以只添加与当前日期相对应的分区。使用 Glue CreatePartition API 调用来创建分区(不幸的是,这需要大量信息才能工作,不过您可以 运行 一个 GetTable 调用来获取它。使用例如 ["2019-04-29"]Values"s3://some-bucket/firehose/year=2019/month=04/day=29"StorageDescriptor.Location。这相当于 运行ning ALTER TABLE some_table ADD PARTITION (date = '2019-04-29) LOCATION 's3://some-bucket/firehose/year=2019/month=04/day=29' – 但通过 Glue 执行速度更快比 Athena 中的 运行ning 查询和 Lambda 中的更多 suitable。
    • 在输入 table 上开始一个 CTAS query,过滤当前日期,按第一个字符或帐户 ID 和当前日期划分。使用位于查询 table 位置下方的 CTAS 输出位置。为 CTAS 操作创建的 table 生成一个随机名称,此 table 将在后面的步骤中删除。使用 Parquet 作为格式。
    • 查看 Poll for Job Status 示例状态机以获得有关如何等待 CTAS 操作完成的灵感。
    • 当 CTAS 操作完成后,列出在使用 Glue GetPartitions and create the same partitions in the query table with BatchCreatePartitions 创建的临时 table 中创建的分区。
    • 最后删除属于您删除的查询 table 分区的所有文件,并删除由 CTAS 操作创建的临时文件 table。

如果您决定对比日期更长的内容进行分区,您仍然可以使用上述过程,但您还需要删除查询中的分区 table 和 S3 上的相应数据,因为每次更新都会替换现有数据(例如按月分区,我建议您尝试,每天您都会为整个月创建新文件,这意味着需要删除旧文件)。如果您想每天 table 多次更新您的查询,那将是相同的。

这看起来很多,而且看起来像 Glue Crawlers 和 Glue ETL 所做的 - 但根据我的经验,它们并没有让事情变得如此简单。

在您的情况下,数据是使用 Hive 样式分区进行分区的,Glue Crawlers 可以理解,但在许多情况下,您不会获得 Hive 样式分区,而只是 Y/M/D(我实际上并不知道Firehose 可以通过这种方式传递数据,我认为它只能做到 Y/M/D)。 Glue Crawler 每次 运行s 也会做很多额外的工作,因为它不知道数据添加到哪里,但你知道从昨天开始添加的唯一分区是昨天的分区,因此抓取减少为一步交易。

Glue ETL 也让事情变得非常困难,与 Lambda 和 Step Functions 相比,它是一项昂贵的服务。您要做的就是将原始数据格式 JSON 转换为 Parquet 并重新分区。据我所知,使用比 Athena CTAS 查询更少的代码是不可能做到这一点的。即使您可以用更少的代码使用 Glue ETL 进行转换操作,您仍然需要编写大量代码来替换目标中的分区 table – 因为这是 Glue ETL 和 Spark 根本做不到的支持。

Athena CTAS 并不是真正为 ETL 而设计的,我认为我上面概述的方法比它应该的要复杂得多,但我相信它比尝试做同样的事情要简单得多事情(即根据另一个table中的数据不断更新并可能替换table中的分区,而不是每次都重建整个table)。

通过此 ETL 过程,您的摄取不必担心按时间进行分区,但您仍会获得针对查询优化的 tables。