boto3 检查 Athena 数据库是否存在

boto3 check if Athena database exists

我正在制作一个脚本,在 AWS Athena 中创建一个数据库,然后为该数据库创建表,今天创建数据库需要很长时间,所以创建的表引用了一个不存在的数据库,是否有使用 boto3 检查数据库是否已在 Athena 中创建的方法?

这是创建数据库的部分:

client = boto3.client('athena')
client.start_query_execution(
    QueryString='create database {}'.format('db_name'),
    ResultConfiguration=config
)

雅典娜的服务员功能尚未实现:Athena Waiter

在 Boto3 中实施之前,请参阅:Support AWS Athena waiter feature 了解可能的解决方法。这就是它在 AWS CLI 中的实现方式。

while True:
    stats = self.athena.get_query_execution(execution_id)
    status = stats['QueryExecution']['Status']['State']
    if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        break
    time.sleep(0.2)
# -*- coding: utf-8 -*-
import logging
import os
from time import sleep

import boto3
import pandas as pd
from backports.tempfile import TemporaryDirectory

logger = logging.getLogger(__name__)


class AthenaQueryFailed(Exception):
    pass


class Athena(object):
    S3_TEMP_BUCKET = "please-replace-with-your-bucket"

    def __init__(self, bucket=S3_TEMP_BUCKET):
        self.bucket = bucket
        self.client = boto3.Session().client("athena")


    def execute_query_in_athena(self, query, output_s3_directory, database="csv_dumps"):
        """ Useful when client executes a query in Athena and want result in the given `s3_directory`
        :param query: Query to be executed in Athena
        :param output_s3_directory: s3 path in which client want results to be stored
        :return: s3 path
        """
        response = self.client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={"Database": database},
            ResultConfiguration={"OutputLocation": output_s3_directory},
        )
        query_execution_id = response["QueryExecutionId"]
        filename = "{filename}.csv".format(filename=response["QueryExecutionId"])
        s3_result_path = os.path.join(output_s3_directory, filename)
        logger.info(
            "Query query_execution_id <<{query_execution_id}>>, result_s3path <<{s3path}>>".format(
                query_execution_id=query_execution_id, s3path=s3_result_path
            )
        )
        self.wait_for_query_to_complete(query_execution_id)
        return s3_result_path

    def wait_for_query_to_complete(self, query_execution_id):
        is_query_running = True
        backoff_time = 10
        while is_query_running:
            response = self.__get_query_status_response(query_execution_id)
            status = response["QueryExecution"]["Status"][
                "State"
            ]  # possible responses: QUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLED
            if status == "SUCCEEDED":
                is_query_running = False
            elif status in ["CANCELED", "FAILED"]:
                raise AthenaQueryFailed(status)
            elif status in ["QUEUED", "RUNNING"]:
                logger.info("Backing off for {} seconds.".format(backoff_time))
                sleep(backoff_time)
            else:
                raise AthenaQueryFailed(status)

    def __get_query_status_response(self, query_execution_id):
        response = self.client.get_query_execution(QueryExecutionId=query_execution_id)
        return response

正如上面的回答所指出的,Athena Waiter 仍然没有实现。

我用这个轻量级的Athena客户端做查询,它returns查询完成后结果的s3路径。