将 pandas 数据帧写入 AWS athena 数据库

Write pandas dataframe into AWS athena database

我有 运行 个使用 pyathena 的查询,并创建了一个 pandas 数据框。有没有办法直接将 pandas 数据框写入 AWS athena 数据库? 就像 data.to_sql 用于 MYSQL 数据库。

下面分享一个dataframe代码示例供参考,需要写入AWS athena数据库:

data=pd.DataFrame({'id':[1,2,3,4,5,6],'name':['a','b','c','d','e','f'],'score':[11,22,33,44,55,66]})

AWS Athena 的存储空间为 S3。它只从 S3 文件中读取数据。 之前不可能像任何其他数据库一样将数据直接写入Athena数据库。

It was missing support support 对于 insert into ....

作为 workaround,用户可以按照以下步骤使其工作。

1. You need to write the pandas output to a file, 
2. Save the file to S3 location, from where the AWS Athena is reading.

希望能给大家一些指点

更新于 05/01/2020。

2019 年 9 月 19 日,AWS 已宣布支持插入到 Athena,并在上述答案 incorrect 中做出了其中一项声明,尽管我提供的上述解决方案将仍然有效,但随着 AWS 公告添加了另一种可能的解决方案。

正如 AWS Documentation 所建议的那样,此功能将允许您发送 insert 语句,并且 Athena 会将数据写回 source table S3 location 中的新文件。所以从本质上讲,AWS 已经解决了将数据写入备份 S3 文件的头痛问题。

请注意,Athena 会将插入的数据写入单独的文件中。 documentation.

实现此目标的另一种现代(截至 2020 年 2 月)方法是使用 aws-data-wrangler 库。它正在自动执行数据处理中的许多例行(有时是烦人的)任务。

结合问题中的案例,代码如下所示:

import pandas as pd
import awswrangler as wr

data=pd.DataFrame({'id':[1,2,3,4,5,6],'name':['a','b','c','d','e','f'],'score':[11,22,33,44,55,66]})

# Typical Pandas, Numpy or Pyarrow transformation HERE!

wr.pandas.to_parquet(  # Storing the data and metadata to Data Lake
    dataframe=data,
    database="database",
    path="s3://your-s3-bucket/path/to/new/table",
    partition_cols=["name"],
)

这非常有用,因为 aws-data-wrangler 知道从路径中解析 table 名称(但您可以在参数中提供 table 名称)并在 Glue 目录中定义适当的类型根据数据框。

它也有助于直接使用 Athena 查询数据到 pandas 数据帧:

df = wr.pandas.read_table(database="dataase", table="table")

所有的过程都将是快速和方便的。

一种选择是使用:

pandas_df.to_parquet(file, engine="pyarrow) 

首先将其保存到 parquet 格式的临时文件中。为此,您需要安装 pyarrow 依赖项。将此文件保存到本地后,您可以使用 python.

的 aws sdk 将其推送到 S3

现在可以通过执行以下查询在 Athena 中创建新的 table:

    CREATE EXTERNAL TABLE IF NOT EXISTS 'your_new_table'
        (col1 type1, col2 type2)
    PARTITIONED BY (col_partitions_if_neccesary)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    LOCATION 's3 location of your parquet file'
    tblproperties ("parquet.compression"="snappy");

另一种选择是为此使用 pyathena。以他们的官方文档为例:

import pandas as pd
from urllib.parse import quote_plus
from sqlalchemy import create_engine

conn_str = "awsathena+rest://:@athena.{region_name}.amazonaws.com:443/"\
           "{schema_name}?s3_staging_dir={s3_staging_dir}&s3_dir={s3_dir}&compression=snappy"

engine = create_engine(conn_str.format(
    region_name="us-west-2",
    schema_name="YOUR_SCHEMA",
    s3_staging_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/"),
    s3_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/")))

df = pd.DataFrame({"a": [1, 2, 3, 4, 5]})
df.to_sql("YOUR_TABLE", engine, schema="YOUR_SCHEMA", index=False, if_exists="replace", method="multi")

在这种情况下,需要依赖sqlalchemy。