如何使用 Boto3 get_query_results 方法从 AWS Athena 创建 Dataframe

How to Create Dataframe from AWS Athena using Boto3 get_query_results method

我正在使用 AWS Athena 从 S3 查询原始数据。由于 Athena 将查询输出写入 S3 输出桶,我曾经这样做:

df = pd.read_csv(OutputLocation)

但这似乎是一种昂贵的方式。最近我注意到 boto3get_query_results 方法,其中 returns 一个复杂的结果字典。

client = boto3.client('athena')
response = client.get_query_results(
        QueryExecutionId=res['QueryExecutionId']
        )

我面临两个主要问题:

  1. 如何将 get_query_results 的结果格式化为 pandas 数据框?
  2. get_query_results 仅 returns 1000 行。我怎样才能用它来获得两百万行?

我有第一个问题的解决方案,使用以下函数

def results_to_df(results):
 
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    listed_results = []
    for res in results['ResultSet']['Rows'][1:]:
         values = []
         for field in res['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        listed_results.append(
            dict(zip(columns, values))
        )
 
    return listed_results

然后:

t = results_to_df(response)
pd.DataFrame(t)

至于我的第二个问题和@EricBellet 的要求,我还添加了我的分页方法,我发现与在 S3 中从 Athena 输出加载结果相比,这种方法效率低且时间更长:

def run_query(query, database, s3_output):
    ''' 
    Function for executing Athena queries and return the query ID 
    '''
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    print('Execution ID: ' + response['QueryExecutionId'])
    return response



def format_result(results):
    '''
    This function format the results toward append in the needed format.
    '''
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    formatted_results = []
 
    for result in results['ResultSet']['Rows'][0:]:
        values = []
        for field in result['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        formatted_results.append(
            dict(zip(columns, values))
        )
    return formatted_results



res = run_query(query_2, database, s3_ouput) #query Athena



import sys
import boto3

marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()

while True:
    paginator = client.get_paginator('get_query_results')
    response_iterator = paginator.paginate( 
        QueryExecutionId=query_id,
        PaginationConfig={
            'MaxItems': 1000,
            'PageSize': 1000,
            'StartingToken': marker})

    for page in response_iterator:
        i = i + 1
        format_page = format_result(page)
        if i == 1:
            formatted_results = pd.DataFrame(format_page)
        elif i > 1:
            formatted_results = formatted_results.append(pd.DataFrame(format_page))

    try:
        marker = page['NextToken']
    except KeyError:
        break

print ("My program took", time.time() - start_time, "to run")

它的格式不太好,但我认为它可以完成工作...

2021 年更新

今天我使用 aws-data-wrangler 的自定义包装作为我几年前提出的原始问题的最佳解决方案。

import awswrangler as wr

def run_athena_query(query, database, s3_output, boto3_session=None, categories=None, chunksize=None, ctas_approach=None, profile=None, workgroup='myTeamName', region_name='us-east-1', keep_files=False, max_cache_seconds=0):
    """
    An end 2 end Athena query method, based on the AWS Wrangler package. 
    The method will execute a query and will return a pandas dataframe as an output.
    you can read more in https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.athena.read_sql_query.html

    Args:
        - query: SQL query.

        - database (str): AWS Glue/Athena database name - It is only the original database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).

        - ctas_approach (bool): Wraps the query using a CTAS, and read the resulted parquet data on S3. If false, read the regular CSV on S3.

        - categories (List[str], optional): List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments.

        - chunksize (Union[int, bool], optional): If passed will split the data in a Iterable of DataFrames (Memory friendly). If True wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. If an INTEGER is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.

        - s3_output (str, optional): Amazon S3 path.

        - workgroup (str, optional): Athena workgroup. 

        - keep_files (bool): Should Wrangler delete or keep the staging files produced by Athena? default is False

        - profile (str, optional): aws account profile. if boto3_session profile will be ignored.

        - boto3_session (boto3.Session(), optional): Boto3 Session. The default boto3 session will be used if boto3_session receive None. if profilename is provided a session will automatically be created.

        - max_cache_seconds (int): Wrangler can look up in Athena’s history if this query has been run before. If so, and its completion time is less than max_cache_seconds before now, wrangler skips query execution and just returns the same results as last time. If reading cached data fails for any reason, execution falls back to the usual query run path. by default is = 0

    Returns:
        - Pandas DataFrame

    """
    # test for boto3 session and profile.
    if ((boto3_session == None) & (profile != None)):
        boto3_session = boto3.Session(profile_name=profile, region_name=region_name)

    print("Quering AWS Athena...")

    try:
        # Retrieving the data from Amazon Athena
        athena_results_df = wr.athena.read_sql_query(
            query,
            database=database,
            boto3_session=boto3_session,
            categories=categories,
            chunksize=chunksize,
            ctas_approach=ctas_approach,
            s3_output=s3_output,
            workgroup=workgroup,
            keep_files=keep_files,
            max_cache_seconds=max_cache_seconds
        )

        print("Query completed, data retrieved successfully!")
    except Exception as e:
        print(f"Something went wrong... the error is:{e}")
        raise Exception(e)

    return athena_results_df

你可以阅读更多here

get_query_results 只有 return 1000 行。我如何使用它在 Pandas 数据框中获取 200 万行?

如果您尝试添加:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

您将得到下一个错误:

An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.

如果直接从存储桶 s3 获取文件(在下一个示例中为 Pandas Dataframe),您可以获得数百万行:

def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3', 
                          region_name = self.region_name, 
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   

self.filename可以是:

self.filename = response['QueryExecutionId'] + ".csv"

因为 Athena 将文件命名为 QueryExecutionId。我会把我所有的代码都写给你,这些代码需要一个查询和 return 一个包含所有行和列的数据框。

import time
import boto3
import pandas as pd
import io

class QueryAthena:

    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query

    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])

        except Exception as e:
            print(e)
        return response                

    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))

            df = self.obtain_data()
            return df

        except Exception as e:
            print(e)      

    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()

            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  


if __name__ == "__main__":       
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()

一个非常简单的解决方案是使用带有 boto3 Athena 分页器的列表理解。然后可以将列表理解简单地传递到 pd.DataFrame() 以创建一个 DataFrame,

pd.DataFrame([[data.get('VarCharValue') for data in row['Data']] for row in
              results['ResultSet']['Rows']])

Boto3 Athena 到 Pandas DataFrame

import pandas as pd
import boto3

result = get_query_results( . . . ) # your code here

def cleanQueryResult(result) :
    '''
    This will take the dictionary of the raw Boto3 Athena results and turn it into a 
    2D array for further processing

    Parameters
    ----------
    result dict
        The dictionary from the boto3 Athena client function get_query_results

    Returns
    -------
    list(list())
        2D list which is essentially the table result. The first row is the column name.
    '''
    return [[data.get('VarCharValue') for data in row['Data']]
            for row in result['ResultSet']['Rows']]

# note that row 1 is the header
df = pd.DataFrame(cleanQueryResult(result))

数百万个结果

这需要分页器对象,https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators

作为提示,您可以按照以下方式在每一页后追加内容

df.append(pd.DataFrame(cleanQueryResult(next_page), ignore_index = True))

也许你可以尝试使用 pandas read_sql 和 pyathena:

from pyathena import connect
import pandas as pd

conn = connect(s3_staging_dir='s3://bucket/folder',region_name='region')
df = pd.read_sql('select * from database.table', conn) #don't change the "database.table"

试试这个方法,使用 columnMetadata:

将响应['records'] 转换成数据帧
def results_to_df(response):
    columns = [
        col['label']
        for col in response['columnMetadata']
    ]

    listed_results = [[list(col.values())[0] if list(col.values())[0] else '' for col in 
    record] for record in response['records']]
    df = pd.DataFrame(listed_results, columns=columns)
    return df

您可以使用 AWS Data Wrangler 创建 pandas 数据框,直接通过 Athena 查询。

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")

您可以找到更多信息here

我使用了 while 循环方法来解决这个问题,如果存在 NextToken,我会扩展 que dataframe:

# Receive Query Results
# Method get_query_results() limits to max 1000, handled with while, and called NextToken.
query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'])
results = query_results['ResultSet']['Rows']
while 'NextToken' in query_results:
    query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'], NextToken = query_results['NextToken'])
    results.extend(query_results['ResultSet']['Rows'])
    return results
return query_results['ResultSet']['Rows']