如何让 MSCK REPAIR TABLE 在 AWS Athena 中自动执行
How to make MSCK REPAIR TABLE execute automatically in AWS Athena
我有一个每小时执行一次的 Spark 批处理作业。每个 运行 生成新数据并将其存储在 S3
中,目录命名模式为 DATA/YEAR=?/MONTH=?/DATE=?/datafile
.
将数据上传到 S3
后,我想使用 Athena
对其进行调查。另外,我想通过连接到 Athena 作为数据源,在 QuickSight
中可视化它们。
问题是在我的 Spark 批处理的每个 运行 之后,存储在 S3
中的新生成的数据将不会被 Athena 发现,除非我手动 运行 查询 MSCK REPAIR TABLE
.
有没有办法让 Athena 自动更新数据,这样我就可以创建一个全自动的数据可视化管道?
有多种方法可以安排此任务。您如何安排工作流程?您使用 Airflow, Luigi, Azkaban, cron, or using an AWS Data pipeline 这样的系统吗?
从其中任何一个,您应该能够启动以下 CLI 命令。
$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"
另一种选择是 AWS Lambda。您可以有一个调用 MSCK REPAIR TABLE some_database.some_table
的函数来响应到 S3 的新上传。
一个示例 Lambda 函数可以这样写:
import boto3
def lambda_handler(event, context):
bucket_name = 'some_bucket'
client = boto3.client('athena')
config = {
'OutputLocation': 's3://' + bucket_name + '/',
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
}
# Query Execution Parameters
sql = 'MSCK REPAIR TABLE some_database.some_table'
context = {'Database': 'some_database'}
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
然后,当新数据添加到存储桶的 DATA/
前缀下时,您将配置一个触发器来执行您的 Lambda 函数。
最终,在 运行 使用作业调度程序的 Spark 作业后显式重建分区具有自我记录的优势。另一方面,AWS Lambda 很适合像这样的工作。
你应该 运行 ADD PARTITION
而不是:
aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."
从您的 S3
位置添加新创建的分区
Athena 利用 Hive 对数据进行分区。
要创建带分区的 table,您必须在 CREATE TABLE
语句期间定义它。使用 PARTITIONED BY
定义分区数据的键。
有多种方法可以解决问题并更新 table:
致电MSCK REPAIR TABLE
。这将扫描所有数据。这是昂贵的,因为每个文件都被完整读取(至少它由 AWS 完全收费)。而且它的速度非常慢。简而言之:不要这样做!
通过调用ALTER TABLE ADD PARTITION abc ...
自行创建分区。从某种意义上说,这很好,无需扫描数据且成本低。查询也很快,所以这里没有问题。如果您的文件结构非常混乱且没有任何通用模式(这似乎不是您的情况,因为它是组织良好的 S3 密钥模式),这也是一个不错的选择。 这种方法也有缺点: A) 很难维护 B) 所有分区都将存储在 GLUE 目录中。当您有很多分区时,这可能会成为一个问题,因为它们需要被读出并传递给 Athena 和 EMRs Hadoop 基础设施。
使用分区投影。您可能想要评估两种不同的风格。这是在查询时为 Hadoop 创建分区的变体。这意味着没有 GLUE 目录条目通过网络发送,因此可以更快地处理大量分区。缺点是您可能 'hit' 一些可能不存在的分区。这些当然会被忽略,但在内部所有 COULD 匹配你的查询的分区都会被生成——不管它们是否在 S3 上(所以总是在你的查询中添加分区过滤器!) .如果操作正确,此选项是一种即发即弃方法,因为不需要更新。
CREATE EXTERNAL TABLE `mydb`.`mytable`
(
...
)
PARTITIONED BY (
`YEAR` int,
`MONTH` int,
`DATE` int)
...
LOCATION
's3://DATA/'
TBLPROPERTIES(
"projection.enabled" = "true",
"projection.account.type" = "integer",
"projection.account.range" = "1,50",
"projection.YEAR.type" = "integer",
"projection.YEAR.range" = "2020,2025",
"projection.MONTH.type" = "integer",
"projection.MONTH.range" = "1,12",
"projection.DATE.type" = "integer",
"projection.DATE.range" = "1,31",
"storage.location.template" = "s3://DATA/YEAR=${YEAR}/MONTH=${MONTH}/DATE=${DATE}/"
);
https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html
仅列出所有选项:您也可以使用GLUE crawlers
。但这似乎不是一个有利的方法,因为它不像宣传的那样灵活。
您可以直接使用 Glue Data Catalog API
对 GLUE 进行更多控制,如果您有很多自动化操作,这可能是方法 #2 的替代方法剧本
做准备工作来设置你的 table.
简而言之:
- 如果您的应用程序以 SQL 为中心,您喜欢没有脚本的最精简方法,请使用分区投影
- 如果你有很多分区,使用分区投影
- 如果您有几个分区或分区没有通用模式,请使用方法 #2
- 如果您的脚本很重,并且脚本完成了大部分工作并且对您来说更容易处理,请考虑方法 #5
- 如果您感到困惑并且不知道从哪里开始 - 请先尝试分区投影!它应该适合 95% 的用例。
我有一个每小时执行一次的 Spark 批处理作业。每个 运行 生成新数据并将其存储在 S3
中,目录命名模式为 DATA/YEAR=?/MONTH=?/DATE=?/datafile
.
将数据上传到 S3
后,我想使用 Athena
对其进行调查。另外,我想通过连接到 Athena 作为数据源,在 QuickSight
中可视化它们。
问题是在我的 Spark 批处理的每个 运行 之后,存储在 S3
中的新生成的数据将不会被 Athena 发现,除非我手动 运行 查询 MSCK REPAIR TABLE
.
有没有办法让 Athena 自动更新数据,这样我就可以创建一个全自动的数据可视化管道?
有多种方法可以安排此任务。您如何安排工作流程?您使用 Airflow, Luigi, Azkaban, cron, or using an AWS Data pipeline 这样的系统吗?
从其中任何一个,您应该能够启动以下 CLI 命令。
$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"
另一种选择是 AWS Lambda。您可以有一个调用 MSCK REPAIR TABLE some_database.some_table
的函数来响应到 S3 的新上传。
一个示例 Lambda 函数可以这样写:
import boto3
def lambda_handler(event, context):
bucket_name = 'some_bucket'
client = boto3.client('athena')
config = {
'OutputLocation': 's3://' + bucket_name + '/',
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
}
# Query Execution Parameters
sql = 'MSCK REPAIR TABLE some_database.some_table'
context = {'Database': 'some_database'}
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
然后,当新数据添加到存储桶的 DATA/
前缀下时,您将配置一个触发器来执行您的 Lambda 函数。
最终,在 运行 使用作业调度程序的 Spark 作业后显式重建分区具有自我记录的优势。另一方面,AWS Lambda 很适合像这样的工作。
你应该 运行 ADD PARTITION
而不是:
aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."
从您的 S3
位置添加新创建的分区
Athena 利用 Hive 对数据进行分区。
要创建带分区的 table,您必须在 CREATE TABLE
语句期间定义它。使用 PARTITIONED BY
定义分区数据的键。
有多种方法可以解决问题并更新 table:
致电
MSCK REPAIR TABLE
。这将扫描所有数据。这是昂贵的,因为每个文件都被完整读取(至少它由 AWS 完全收费)。而且它的速度非常慢。简而言之:不要这样做!通过调用
ALTER TABLE ADD PARTITION abc ...
自行创建分区。从某种意义上说,这很好,无需扫描数据且成本低。查询也很快,所以这里没有问题。如果您的文件结构非常混乱且没有任何通用模式(这似乎不是您的情况,因为它是组织良好的 S3 密钥模式),这也是一个不错的选择。 这种方法也有缺点: A) 很难维护 B) 所有分区都将存储在 GLUE 目录中。当您有很多分区时,这可能会成为一个问题,因为它们需要被读出并传递给 Athena 和 EMRs Hadoop 基础设施。使用分区投影。您可能想要评估两种不同的风格。这是在查询时为 Hadoop 创建分区的变体。这意味着没有 GLUE 目录条目通过网络发送,因此可以更快地处理大量分区。缺点是您可能 'hit' 一些可能不存在的分区。这些当然会被忽略,但在内部所有 COULD 匹配你的查询的分区都会被生成——不管它们是否在 S3 上(所以总是在你的查询中添加分区过滤器!) .如果操作正确,此选项是一种即发即弃方法,因为不需要更新。
CREATE EXTERNAL TABLE `mydb`.`mytable`
(
...
)
PARTITIONED BY (
`YEAR` int,
`MONTH` int,
`DATE` int)
...
LOCATION
's3://DATA/'
TBLPROPERTIES(
"projection.enabled" = "true",
"projection.account.type" = "integer",
"projection.account.range" = "1,50",
"projection.YEAR.type" = "integer",
"projection.YEAR.range" = "2020,2025",
"projection.MONTH.type" = "integer",
"projection.MONTH.range" = "1,12",
"projection.DATE.type" = "integer",
"projection.DATE.range" = "1,31",
"storage.location.template" = "s3://DATA/YEAR=${YEAR}/MONTH=${MONTH}/DATE=${DATE}/"
);
https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html
仅列出所有选项:您也可以使用
GLUE crawlers
。但这似乎不是一个有利的方法,因为它不像宣传的那样灵活。您可以直接使用
Glue Data Catalog API
对 GLUE 进行更多控制,如果您有很多自动化操作,这可能是方法 #2 的替代方法剧本 做准备工作来设置你的 table.
简而言之:
- 如果您的应用程序以 SQL 为中心,您喜欢没有脚本的最精简方法,请使用分区投影
- 如果你有很多分区,使用分区投影
- 如果您有几个分区或分区没有通用模式,请使用方法 #2
- 如果您的脚本很重,并且脚本完成了大部分工作并且对您来说更容易处理,请考虑方法 #5
- 如果您感到困惑并且不知道从哪里开始 - 请先尝试分区投影!它应该适合 95% 的用例。