将数据从 Athena 加载到 AWS 中 Cloud9/Lambda 中的 Pandas 数据帧以进行 ETL

Load data from Athena into Pandas dataframe in Cloud9/Lambda in AWS for ETL

我正在 AWS 中构建数据湖。源数据作为 CDC 导入到 S3 中。我需要找到一种方法来合并它们,以便 table 具有最新版本的信息。

本来想用Glue做ETL开发,但是编辑器好像比较笨拙。此外,数据量不是很大,因此需要 spark。 Pandas 也会工作,并且在组织中拥有更广泛的知识基础。

所以我使用 Glue 来抓取导入,现在有了 Athena tables,我想在 Cloud9 上开发我的聚合,以便稍后迁移到 Lambda 函数。

问题是我无法将 Athena 数据放入数据框中。

我已经在 boto3 中尝试了 start_query_execution 函数,但它没有 return 数据,只是将它写入我不想要的 S3。它还 return 作为 QueryExecutionId,我已将其传递给另一个名为 get_query_results 的 boto 函数。似乎有一个响应,但我在如何将数据传递到数据帧中苦苦挣扎(它是 JSON 还是字典?)。

#python 3.6
import pandas as pd
import numpy as np
import boto3
import time

#https://dev.classmethod.jp/cloud/run-amazon-athenas-query-with-aws-lambda/

#athena constant
DATABASE = 'myDatabase'
TABLE = 'myTable'

#output
S3_OUTPUT = 's3://myBucket/myPath/'

client = boto3.client('athena')

response = client.start_query_execution(
        QueryString='select * from myTable limit 100',
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': S3_OUTPUT,

        }
)

print(response["QueryExecutionId"])

time.sleep(50)

data = client.get_query_results(
    QueryExecutionId=response["QueryExecutionId"]
)

dataDf = pd.read_json(data["ResultSet"])
print(dataDf.head())

这对我有用。下载文件而不是使用 JSON 响应。

import os
import boto3

s3 = boto3.client('s3')
bucket = 'myBucket'
key = 'myPath'

data_file_name = f'{response["QueryExecutionId"]}.csv'
object = os.path.join(key, data_file_name)
s3.download_file(bucket, object, data_file_name)
df = pd.read_csv(data_file_name)