如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet

How to Convert Many CSV files to Parquet using AWS Glue

我在以下设置中使用 AWS S3、Glue 和 Athena:

S3 --> 胶水 --> 雅典娜

我的原始数据作为 CSV 文件存储在 S3 上。我正在使用 Glue 进行 ETL,我正在使用 Athena 来查询数据。

由于我使用的是 Athena,因此我想将 CSV 文件转换为 Parquet。我现在正在使用 AWS Glue 来执行此操作。这是我正在使用的当前流程:

  1. 运行 用于读取 CSV 文件并填充数据目录的爬虫程序。
  2. 运行 从 Data Catalog 创建 Parquet 文件的 ETL 作业。
  3. 运行 使用 Parquet 文件填充数据目录的爬虫。

Glue 作业一次只能转换一个 table。如果我有很多 CSV 文件,这个过程很快就会变得难以管理。有没有更好的方法,也许是 "correct" 方法,使用 AWS Glue 或其他一些 AWS 服务将 许多 CSV 文件转换为 Parquet?

请参阅编辑以获取更新信息。

S3 --> Athena

为什么不在 Athena 中直接使用 CSV 格式?

https://docs.aws.amazon.com/athena/latest/ug/supported-format.html

CSV 是受支持的格式之一。另外为了提高效率,您可以压缩多个 CSV 文件以加快加载速度。

支持压缩,

https://docs.aws.amazon.com/athena/latest/ug/compression-formats.html

希望对您有所帮助。

编辑:

为什么 Parquet 格式比 CSV 更有用?

https://dzone.com/articles/how-to-be-a-hero-with-powerful-parquet-google-and

S3 --> Glue --> Athena

有关 CSV 到 Parquet 转换的更多详细信息,

https://aws.amazon.com/blogs/big-data/build-a-data-lake-foundation-with-aws-glue-and-amazon-s3/

听起来您在第 1 步中正在抓取单个 csv 文件(例如 some-bucket/container-path/file.csv),但如果您改为将抓取工具设置为查看路径级别而不是文件级别(例如 some-bucket/container-path/) 并且您所有的 csv 文件都是统一的,那么爬虫应该只创建一个外部 table 而不是每个文件一个外部 table 并且您将能够从所有文件中提取数据文件数量。

我遇到了完全相同的情况,我想高效地循环遍历由爬虫编目的目录 table,这些目录指向 csv 文件,然后将它们转换为镶木地板。不幸的是,网络上还没有太多可用的信息。这就是为什么我在 LinkedIn 中写了一篇博客来解释我是如何做到的。请阅读;特别要点#5。希望有所帮助。请让我知道您的反馈。

注意:根据 Antti 的反馈,我将我博客中的解决方案摘录粘贴如下:

  1. 遍历 catalog/database/tables

作业向导附带 运行 数据源上预定义脚本的选项。问题是您可以 select 的数据源是来自目录的单个 table。它不会让您选择 运行 整个数据库或一组 table 的作业。无论如何,您以后都可以修改脚本,但是在 glue 目录中遍历数据库 tables 的方法也很难找到。有目录 API,但缺少 suitable 示例。 github 示例 repo 可以丰富更多场景以帮助开发人员。

经过一番折腾,我想出了下面的脚本来完成这项工作。我已经使用 boto3 客户端循环遍历 table。如果有人需要帮助,我将其粘贴在这里。如果您有更好的建议,我也想听听您的意见

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


client = boto3.client('glue', region_name='ap-southeast-2')

databaseName = 'tpc-ds-csv'
print '\ndatabaseName: ' + databaseName

Tables = client.get_tables(DatabaseName=databaseName)

tableList = Tables['TableList']

for table in tableList:
    tableName = table['Name']
    print '\n-- tableName: ' + tableName

    datasource0 = glueContext.create_dynamic_frame.from_catalog(
        database="tpc-ds-csv", 
        table_name=tableName, 
        transformation_ctx="datasource0"
    )

    datasink4 = glueContext.write_dynamic_frame.from_options(
        frame=datasource0,
        connection_type="s3", 
        connection_options={
            "path": "s3://aws-glue-tpcds-parquet/"+ tableName + "/"
            },
        format="parquet",
        transformation_ctx="datasink4"
    )
job.commit()

我不是 Glue 的忠实粉丝,也不喜欢从数据创建模式

以下是在 Athena 中的操作方法,它比 Glue 快得多。

这是 CSV 文件:

create table foo (
  id int,
  name string,
  some date
)
row format delimited
  fields terminated by ','
location 's3://mybucket/path/to/csvs/'

这是镶木地板文件:

create table bar 
with (
  external_location = 's3://mybucket/path/to/parquet/',
  format = 'PARQUET'
)
as select * from foo 

您不需要为镶木地板创建该路径,即使您使用分区

您可以直接将 JSON 或 CSV 文件转换为 parquet,而无需先将其导入目录。

这是针对 JSON 文件的 - 下面的代码将转换 rawFiles 目录中托管的任何内容

import sys
from awsglue.job import Job 
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions

## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
spark = glueContext.spark_session
job = Job(glueContext) job.init(args['JOB_NAME'], args)

s3_json_path = 's3://rawFiles/'  
s3_parquet_path = 's3://convertedFiles/'

output = spark.read.load(s3_json_path, format='json') 
output.write.parquet(s3_parquet_path)

job.commit()