如何让 MSCK REPAIR TABLE 在 AWS Athena 中自动执行

How to make MSCK REPAIR TABLE execute automatically in AWS Athena

我有一个每小时执行一次的 Spark 批处理作业。每个 运行 生成新数据并将其存储在 S3 中,目录命名模式为 DATA/YEAR=?/MONTH=?/DATE=?/datafile.

将数据上传到 S3 后,我想使用 Athena 对其进行调查。另外,我想通过连接到 Athena 作为数据源,在 QuickSight 中可视化它们。

问题是在我的 Spark 批处理的每个 运行 之后,存储在 S3 中的新生成的数据将不会被 Athena 发现,除非我手动 运行 查询 MSCK REPAIR TABLE.

有没有办法让 Athena 自动更新数据,这样我就可以创建一个全自动的数据可视化管道?

有多种方法可以安排此任务。您如何安排工作流程?您使用 Airflow, Luigi, Azkaban, cron, or using an AWS Data pipeline 这样的系统吗?

从其中任何一个,您应该能够启动以下 CLI 命令。

$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"

另一种选择是 AWS Lambda。您可以有一个调用 MSCK REPAIR TABLE some_database.some_table 的函数来响应到 S3 的新上传。

一个示例 Lambda 函数可以这样写:

import boto3

def lambda_handler(event, context):
    bucket_name = 'some_bucket'

    client = boto3.client('athena')

    config = {
        'OutputLocation': 's3://' + bucket_name + '/',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}

    }

    # Query Execution Parameters
    sql = 'MSCK REPAIR TABLE some_database.some_table'
    context = {'Database': 'some_database'}

    client.start_query_execution(QueryString = sql, 
                                 QueryExecutionContext = context,
                                 ResultConfiguration = config)

然后,当新数据添加到存储桶的 DATA/ 前缀下时,您将配置一个触发器来执行您的 Lambda 函数。

最终,在 运行 使用作业调度程序的 Spark 作业后显式重建分区具有自我记录的优势。另一方面,AWS Lambda 很适合像这样的工作。

你应该 运行 ADD PARTITION 而不是:

aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."

从您的 S3 位置添加新创建的分区 Athena 利用 Hive 对数据进行分区。 要创建带分区的 table,您必须在 CREATE TABLE 语句期间定义它。使用 PARTITIONED BY 定义分区数据的键。

有多种方法可以解决问题并更新 table:

  1. 致电MSCK REPAIR TABLE。这将扫描所有数据。这是昂贵的,因为每个文件都被完整读取(至少它由 AWS 完全收费)。而且它的速度非常慢。简而言之:不要这样做!

  2. 通过调用ALTER TABLE ADD PARTITION abc ...自行创建分区。从某种意义上说,这很好,无需扫描数据且成本低。查询也很快,所以这里没有问题。如果您的文件结构非常混乱且没有任何通用模式(这似乎不是您的情况,因为它是组织良好的 S3 密钥模式),这也是一个不错的选择。 这种方法也有缺点: A) 很难维护 B) 所有分区都将存储在 GLUE 目录中。当您有很多分区时,这可能会成为一个问题,因为它们需要被读出并传递给 Athena 和 EMRs Hadoop 基础设施。

  3. 使用分区投影。您可能想要评估两种不同的风格。这是在查询时为 Hadoop 创建分区的变体。这意味着没有 GLUE 目录条目通过网络发送,因此可以更快地处理大量分区。缺点是您可能 'hit' 一些可能不存在的分区。这些当然会被忽略,但在内部所有 COULD 匹配你的查询的分区都会被生成——不管它们是否在 S3 上(所以总是在你的查询中添加分区过滤器!) .如果操作正确,此选项是一种即发即弃方法,因为不需要更新。

CREATE EXTERNAL TABLE `mydb`.`mytable`
(
   ...
)
  PARTITIONED BY (
    `YEAR` int,
    `MONTH` int,
    `DATE` int)
  ...
  LOCATION
    's3://DATA/'
  TBLPROPERTIES(
      "projection.enabled" = "true",
      "projection.account.type" = "integer",
      "projection.account.range" = "1,50",
      "projection.YEAR.type" = "integer",
      "projection.YEAR.range" = "2020,2025",
      "projection.MONTH.type" = "integer",
      "projection.MONTH.range" = "1,12",
      "projection.DATE.type" = "integer",
      "projection.DATE.range" = "1,31",
      "storage.location.template" = "s3://DATA/YEAR=${YEAR}/MONTH=${MONTH}/DATE=${DATE}/"
  );

https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html

  1. 仅列出所有选项:您也可以使用GLUE crawlers。但这似乎不是一个有利的方法,因为它不像宣传的那样灵活。

  2. 您可以直接使用 Glue Data Catalog API 对 GLUE 进行更多控制,如果您有很多自动化操作,这可能是方法 #2 的替代方法剧本 做准备工作来设置你的 table.

简而言之:

  • 如果您的应用程序以 SQL 为中心,您喜欢没有脚本的最精简方法,请使用分区投影
  • 如果你有很多分区,使用分区投影
  • 如果您有几个分区或分区没有通用模式,请使用方法 #2
  • 如果您的脚本很重,并且脚本完成了大部分工作并且对您来说更容易处理,请考虑方法 #5
  • 如果您感到困惑并且不知道从哪里开始 - 请先尝试分区投影!它应该适合 95% 的用例。