在 Athena CTAS 上创建超过 100 个分区的替代方法

Alternative to create more than 100 partitions on Athena CTAS

我目前正在根据存储在 Amazon S3 中的信息创建一些新的 table。第一次使用 AWS,今天我了解到 Amazon Athena 无法通过 CTAS 查询创建超过 100 个分区。

我正在使用 sql 进行转换,它工作得很好,但需要一种方法来一次存储 100 多个分区以使过程更加可靠。

我正在按日期设置分区,因此如果我需要重新创建 table 以通过 sql 加载大量数据(其中我有转换)。

知道如何实现吗?

假设您要使用 CTAS 查询处理 4 个月的数据,但您需要按天对其进行分区。如果您在单个 CTAS 查询中执行此操作,您最终将得到大约 4 x 30 = 120 个分区,因此,正如您提到的那样,由于 AWS limitations.

,查询将失败

相反,您可以一次处理每个月的数据,这样您就可以保证一次拥有少于 31 个分区。然而,每个 CTAS 查询的结果在 S3 上应该有一个唯一的外部位置,即如果你想在 s3://bukcet-name/data-root 下存储多个 CTAS 查询的结果,你需要为 external_location 中的每个查询扩展这个路径在 WITH 条款下。您的案例的明显选择是完整日期,例如:

s3://bukcet-name/data-root
├──2019-01            <-- external_location='s3://bukcet-name/data-root/2019-01'
|   └── month=01
|       ├── day=01
|       |   ...
|       └── day=31
├──2019-02            <-- external_location='s3://bukcet-name/data-root/2019-02'
|   └── month=02
|       ├── day=01
|       |   ...
|       └── day=28
...

但是,现在您得到了 4 个不同的 table。因此,您要么需要查询不同的 table,要么必须进行一些后处理。本质上,您将有两个选择

  1. 将所有新文件移动到 AWS CLI high-level commands which should be followed by MSCK REPAIR TABLE 的公共位置,因为输出 "directory" 结构遵循 HIVE 分区命名约定。例如来自

    s3://bukcet-name/data-staging-area
    ├──2019-01        <-- external_location='s3://bukcet-name/data-staging-area/2019-01'
    |   └── month=01
    |       ├── day=01
    |       |   ...
    

    你会复制到

    s3://bukcet-name/data-root
    ├── month=01
    |  ├── day=01
    |  |   ...
    |  └── day=31
    ├── month=02
    |   ├── day=01
    |   |   ...
    |   └── day=28
    
  2. 使用 AWS Glue 数据目录进行操作。这有点棘手,但主要思想是定义一个 root table,其位置指向 s3://bukcet-name/data-root。然后在执行 CTAS 查询后,您需要将有关分区的元信息从创建的 "staging" table 复制到根 table 中。此步骤将基于 Python 的 AWS Glue API via for example boto3 库。特别是,您将使用 get_partitions()batch_create_partition() 方法。

无论您选择哪种方法,您都需要使用某种作业调度软件,尤其是因为您的数据不仅仅是历史数据。我建议为此使用 Apache Airflow 。它可以看作是 Lambda 和 Step Functions 组合的替代品,它是完全免费的。有很多博客文章和文档可以帮助您入门。例如:

  • Medium post:自动执行 AWS Athena 查询并使用 Airflow 在 S3 周围移动结果。
  • Airflow 安装完整指南,link 1 and link 2

您甚至可以设置 integration with Slack 在查询以成功或失败状态终止时发送通知。

注意事项:

一般来说,由于 Athena 是一个分布式系统,您无法明确控制将创建多少文件作为 CTAS 查询的结果。另一方面,您不希望有很多小文件。所以可以尝试使用 "this workaround",它在 WITH 子句

中使用 bucketed_bybucket_count 字段
CREATE TABLE new_table
WITH (
    ...
    bucketed_by=ARRAY['some_column_from_select'],
    bucket_count=1
) AS (
    -- Here goes your normal query 
    SELECT 
        *
    FROM 
        old_table;
)

或者,减少分区数量,即在月级别停止。

最好的选择是为此任务编写 Glue ETL (spark) 作业并使用 spark sql 执行所需的转换。这样您仍然可以使用现有的 sql 查询。

然后你可以将处理后的输出写回某个S3路径。 Spark 允许您创建任意数量的分区。它还允许将新处理的数据附加到已处理的数据,从而允许您仅加载和转换新数据。

ETL 完成后,创建一个指向上面使用的 S3 路径和所需分区的外部 table。这将是一个时间步(创建外部 table)。您只需要在每次粘合作业后更新此外部 table 中的分区信息。

总而言之,您需要执行以下操作:

  • 创建要在 Glue ETL 上执行的 spark 脚本,它将读取每日源数据,应用所需的转换并将处理后的数据写入 S3 的新分区中。这个脚本可以很容易地接受日期作为输入,并且将是一次 activity.

  • 创建一个指向 S3 上已处理数据的外部 table。这也是一次activity.

  • 在每个 Glue ETL 作业更新新分区后,在上述外部 table 上执行 MSCK 修复命令。

参考资料:

AWS Glue ETL documentation

AWS Athena - Create external table

AWS Athena - Update partiotion