通过 Athena and/or Glue 将 S3 数据 ETLing 为 CSV

ETLing S3 data into CSV via Athena and/or Glue

我有一个装满图像文件的 S3 存储桶 (com.example.myorg.images),它们都遵循相同的命名约定:

<PRODUCT_ID>_<NUMBER>.jpg

其中 <PRODUCT_ID> 是一个长数字(RDS table 中的主键),并且 <NUMBER> 始终是三个值之一:100、200 或 300。所以对于例如,存储桶可能包含:

我想编写一个 Athena 或 Glue ETL 进程来查询 S3 存储桶中的所有图像,并以某种方式提取 UNIQUE <PRODUCT_ID> 值转换为 table 或列表。

据我了解,Athena 随后会将此 table/list 备份到可下载的 CSV 文件中;如果为真,那么我将在命令行上按照我需要的方式单独处理无头 CSV。

因此,例如,如果上面的 6 张图片是存储桶中的 只有 张图片,那么此过程将:

  1. 查询S3得到一个table/list,由13942039491394203950
  2. 组成
  3. 创建如下所示的可下载 CSV:

可能是 S3 上的文件,甚至是内存中的文件:

1394203949,1394203950

之前没有使用过 Athena 或 Glue 的经验,我试图通过 Athena 查询来完成此操作,但我很难透过树木看到森林。

我在第一部分(S3 查询)的最佳尝试:

CREATE EXTERNAL TABLE IF NOT EXISTS products_with_thumbnails (
  product_id string
) 
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  ESCAPED BY '\'
  LINES TERMINATED BY '\n'
LOCATION 's3://com.example.myorg.images/';

这将设置我的内存 table 我相信 包含 S3 存储桶中所有内容的文件名,但是:

我不偏爱 Athena 或 Glue,并且会很高兴 任何 解决方案都能满足我的需求。 最坏的情况 我可以编写一个 Lambda 来在应用程序层完成所有这些 ETL,但是如果存在类似 Hive 或面向 ETL 的 AWS 服务来执行此类操作无论如何,我宁愿利用它!

提前致谢!

Athena 查询 inside 文件,而不是文件列表,因此仅使用 Athena 是行不通的(有滥用它来实现它的方法,但它们会又贵又慢,不是你想要的)。

如果图像数量少于十万左右,我认为你最好的选择是编写一个脚本,其功能或多或少相当于 aws s3 ls --recursive s3://some-bucket/ | perl -ne '/(\d+)_d+\.jpg$/ && print "\n"' | uniq

如果超过这个数,我建议使用 S3 Inventory 或 Athena 进行处理。您可以在此处找到有关如何启用 S3 清单以及使用 Athena 查询清单的说明:https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-inventory.html

设置 S3 清单后,您的查询可能如下所示:

SELECT DISTINCT regexp_extract(key, '(\d+)_\d+\.jpg', 1)
FROM the_inventory_table_name

不过,与设置 Athena 表相比,编写处理库存的脚本可能工作量更少。不过,我真的建议使用 S3 Inventory 而不是在有很多对象时直接列出 S3。

看起来您可以在 S3 中创建 S3 清单的分区文件,按日期分区:

CREATE EXTERNAL TABLE my_inventory(
  `bucket` string,
  key string,
  version_id string,
  is_latest boolean,
  is_delete_marker boolean,
  size bigint,
  last_modified_date timestamp,
  e_tag string,
  storage_class string,
  is_multipart_uploaded boolean,
  replication_status string,
  encryption_status string,
  object_lock_retain_until_date timestamp,
  object_lock_mode string,
  object_lock_legal_hold_status string
  )
  PARTITIONED BY (dt string)
  ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
  STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
  OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
  LOCATION 's3://com.example.myorg.mybucket/com.example.myorg.mybucket/com.example.myorg.mybucket-ORC/hive/';

然后,任何时候您想查询 my_inventory table,首先通过为当前日期创建一个新分区来修复分区文件:

MSCK REPAIR TABLE my_inventory;

最后你可以通过 PrestoDB 的 SQL-like 语法查询它:

SELECT key FROM my_inventory WHERE dt <= '<YYYY-MM-DD>-00-00';

其中 <YYYY-MM-DD>YYYY-MM-DD 格式的当前日期。

然后您可以将查询结果下载为 CSV 文件并根据需要进行处理。