AWS Athena:删除日期范围之间的分区

AWS Athena: Delete partitions between date range

我有一个 athena table,其分区基于这样的日期:

20190218

我想删除去年创建的所有分区。

我试过下面的查询,但没用。

ALTER TABLE tblname DROP PARTITION (partition1 < '20181231');

ALTER TABLE tblname DROP PARTITION (partition1 > '20181010'), Partition (partition1 < '20181231');

根据 https://docs.aws.amazon.com/athena/latest/ug/alter-table-drop-partition.htmlALTER TABLE tblname DROP PARTITION 采用分区规范,因此不允许范围。

在 Presto 中,您可以 DELETE FROM tblname WHERE ...,但 Athena 也不支持 DELETE

出于这些原因,您需要利用一些外部解决方案。

例如:

  1. 列出文件,如
  2. 删除文件和包含目录
  3. 更新分区信息(https://docs.aws.amazon.com/athena/latest/ug/msck-repair-table.html 应该会有帮助)

虽然 Athena SQL 目前可能不支持它,但 Glue API 调用 GetPartitions(Athena 在后台用于查询)支持类似于以下的复杂过滤器表达式你可以在 SQL WHERE 表达式中写什么。

您可以 GetPartitions followed by BatchDeletePartition 使用 Glue API。

而不是通过 Athena 删除分区

这是 Theo 推荐的脚本。

import json
import logging

import awswrangler as wr
import boto3
from botocore.exceptions import ClientError

logging.basicConfig(level=logging.INFO, format=logging.BASIC_FORMAT)
logger = logging.getLogger()


def delete_partitions(database_name: str, table_name: str):
  client = boto3.client('glue')
  paginator = client.get_paginator('get_partitions')
  page_count = 0
  partition_count = 0
  for page in paginator.paginate(DatabaseName=database_name, TableName=table_name, MaxResults=20):
    page_count = page_count + 1
    partitions = page['Partitions']
    partitions_to_delete = []
    for partition in partitions:
      partition_count = partition_count + 1
      partitions_to_delete.append({'Values': partition['Values']})
      logger.info(f"Found partition {partition['Values']}")
    if partitions_to_delete:
      response = client.batch_delete_partition(DatabaseName=database_name, TableName=table_name,
        PartitionsToDelete=partitions_to_delete)
      logger.info(f'Deleted partitions with response: {response}')
    else:
      logger.info('Done with all partitions')


def repair_table(database_name: str, table_name: str):
  client = boto3.client('athena')
  try:
    response = client.start_query_execution(QueryString='MSCK REPAIR TABLE ' + table_name + ';',
      QueryExecutionContext={'Database': database_name}, )
  except ClientError as err:
    logger.info(err.response['Error']['Message'])
  else:
    res = wr.athena.wait_query(query_execution_id=response['QueryExecutionId'])
    logger.info(f"Query succeeded: {json.dumps(res, indent=2)}")


if __name__ == '__main__':
  table = 'table_name'
  database = 'database_name'
  delete_partitions(database_name=database, table_name=table)
  repair_table(database_name=database, table_name=table)

发布 Java 的 Glue API 解决方法,为需要它的人节省一些时间:

public void deleteMetadataTablePartition(String catalog,
                                         String db,
                                         String table,
                                         String expression) {

    GetPartitionsRequest getPartitionsRequest = new GetPartitionsRequest()
            .withCatalogId(catalog)
            .withDatabaseName(db)
            .withTableName(table)
            .withExpression(expression);

    List<PartitionValueList> partitionsToDelete = new ArrayList<>();

    do {
        GetPartitionsResult getPartitionsResult = this.glue.getPartitions(getPartitionsRequest);
        List<PartitionValueList> partitionsValues = getPartitionsResult.getPartitions()
                .parallelStream()
                .map(p -> new PartitionValueList().withValues(p.getValues()))
                .collect(Collectors.toList());

        partitionsToDelete.addAll(partitionsValues);

        getPartitionsRequest.setNextToken(getPartitionsResult.getNextToken());
    } while (getPartitionsRequest.getNextToken() != null);

    Lists.partition(partitionsToDelete, 25)
            .parallelStream()
            .forEach(partitionValueList -> {
                glue.batchDeletePartition(
                        new BatchDeletePartitionRequest()
                                .withCatalogId(catalog)
                                .withDatabaseName(db)
                                .withTableName(table)
                                .withPartitionsToDelete(partitionValueList));
            });
}