通过 Athena and/or Glue 将 S3 数据 ETLing 为 CSV
ETLing S3 data into CSV via Athena and/or Glue
我有一个装满图像文件的 S3 存储桶 (com.example.myorg.images
),它们都遵循相同的命名约定:
<PRODUCT_ID>_<NUMBER>.jpg
其中 <PRODUCT_ID>
是一个长数字(RDS table 中的主键),并且 <NUMBER>
始终是三个值之一:100、200 或 300。所以对于例如,存储桶可能包含:
- 1394203949_100.jpg
- 1394203949_200.jpg
- 1394203949_300.jpg
- 1394203950_100.jpg
- 1394203950_200.jpg
- 1394203950_300.jpg
- ...等等
我想编写一个 Athena 或 Glue ETL 进程来查询 S3 存储桶中的所有图像,并以某种方式提取 UNIQUE <PRODUCT_ID>
值转换为 table 或列表。
据我了解,Athena 随后会将此 table/list 备份到可下载的 CSV 文件中;如果为真,那么我将在命令行上按照我需要的方式单独处理无头 CSV。
因此,例如,如果上面的 6 张图片是存储桶中的 只有 张图片,那么此过程将:
- 查询S3得到一个table/list,由
1394203949
和1394203950
组成
- 创建如下所示的可下载 CSV:
可能是 S3 上的文件,甚至是内存中的文件:
1394203949,1394203950
之前没有使用过 Athena 或 Glue 的经验,我试图通过 Athena 查询来完成此操作,但我很难透过树木看到森林。
我在第一部分(S3 查询)的最佳尝试:
CREATE EXTERNAL TABLE IF NOT EXISTS products_with_thumbnails (
product_id string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\'
LINES TERMINATED BY '\n'
LOCATION 's3://com.example.myorg.images/';
这将设置我的内存 table 我相信 包含 S3 存储桶中所有内容的文件名,但是:
- 如何让这个 table 只包含唯一的产品 ID(没有重复)?
- 如何只提取文件名的
<PRODUCT_ID>
段(1394203949
而不是 1394203949_100.jpg
)?
我不偏爱 Athena 或 Glue,并且会很高兴 任何 解决方案都能满足我的需求。 最坏的情况 我可以编写一个 Lambda 来在应用程序层完成所有这些 ETL,但是如果存在类似 Hive 或面向 ETL 的 AWS 服务来执行此类操作无论如何,我宁愿利用它!
提前致谢!
Athena 查询 inside 文件,而不是文件列表,因此仅使用 Athena 是行不通的(有滥用它来实现它的方法,但它们会又贵又慢,不是你想要的)。
如果图像数量少于十万左右,我认为你最好的选择是编写一个脚本,其功能或多或少相当于 aws s3 ls --recursive s3://some-bucket/ | perl -ne '/(\d+)_d+\.jpg$/ && print "\n"' | uniq
。
如果超过这个数,我建议使用 S3 Inventory 或 Athena 进行处理。您可以在此处找到有关如何启用 S3 清单以及使用 Athena 查询清单的说明:https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-inventory.html
设置 S3 清单后,您的查询可能如下所示:
SELECT DISTINCT regexp_extract(key, '(\d+)_\d+\.jpg', 1)
FROM the_inventory_table_name
不过,与设置 Athena 表相比,编写处理库存的脚本可能工作量更少。不过,我真的建议使用 S3 Inventory 而不是在有很多对象时直接列出 S3。
看起来您可以在 S3 中创建 S3 清单的分区文件,按日期分区:
CREATE EXTERNAL TABLE my_inventory(
`bucket` string,
key string,
version_id string,
is_latest boolean,
is_delete_marker boolean,
size bigint,
last_modified_date timestamp,
e_tag string,
storage_class string,
is_multipart_uploaded boolean,
replication_status string,
encryption_status string,
object_lock_retain_until_date timestamp,
object_lock_mode string,
object_lock_legal_hold_status string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://com.example.myorg.mybucket/com.example.myorg.mybucket/com.example.myorg.mybucket-ORC/hive/';
然后,任何时候您想查询 my_inventory
table,首先通过为当前日期创建一个新分区来修复分区文件:
MSCK REPAIR TABLE my_inventory;
最后你可以通过 PrestoDB 的 SQL-like 语法查询它:
SELECT key FROM my_inventory WHERE dt <= '<YYYY-MM-DD>-00-00';
其中 <YYYY-MM-DD>
是 YYYY-MM-DD
格式的当前日期。
然后您可以将查询结果下载为 CSV 文件并根据需要进行处理。
我有一个装满图像文件的 S3 存储桶 (com.example.myorg.images
),它们都遵循相同的命名约定:
<PRODUCT_ID>_<NUMBER>.jpg
其中 <PRODUCT_ID>
是一个长数字(RDS table 中的主键),并且 <NUMBER>
始终是三个值之一:100、200 或 300。所以对于例如,存储桶可能包含:
- 1394203949_100.jpg
- 1394203949_200.jpg
- 1394203949_300.jpg
- 1394203950_100.jpg
- 1394203950_200.jpg
- 1394203950_300.jpg
- ...等等
我想编写一个 Athena 或 Glue ETL 进程来查询 S3 存储桶中的所有图像,并以某种方式提取 UNIQUE <PRODUCT_ID>
值转换为 table 或列表。
据我了解,Athena 随后会将此 table/list 备份到可下载的 CSV 文件中;如果为真,那么我将在命令行上按照我需要的方式单独处理无头 CSV。
因此,例如,如果上面的 6 张图片是存储桶中的 只有 张图片,那么此过程将:
- 查询S3得到一个table/list,由
1394203949
和1394203950
组成
- 创建如下所示的可下载 CSV:
可能是 S3 上的文件,甚至是内存中的文件:
1394203949,1394203950
之前没有使用过 Athena 或 Glue 的经验,我试图通过 Athena 查询来完成此操作,但我很难透过树木看到森林。
我在第一部分(S3 查询)的最佳尝试:
CREATE EXTERNAL TABLE IF NOT EXISTS products_with_thumbnails (
product_id string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\'
LINES TERMINATED BY '\n'
LOCATION 's3://com.example.myorg.images/';
这将设置我的内存 table 我相信 包含 S3 存储桶中所有内容的文件名,但是:
- 如何让这个 table 只包含唯一的产品 ID(没有重复)?
- 如何只提取文件名的
<PRODUCT_ID>
段(1394203949
而不是1394203949_100.jpg
)?
我不偏爱 Athena 或 Glue,并且会很高兴 任何 解决方案都能满足我的需求。 最坏的情况 我可以编写一个 Lambda 来在应用程序层完成所有这些 ETL,但是如果存在类似 Hive 或面向 ETL 的 AWS 服务来执行此类操作无论如何,我宁愿利用它!
提前致谢!
Athena 查询 inside 文件,而不是文件列表,因此仅使用 Athena 是行不通的(有滥用它来实现它的方法,但它们会又贵又慢,不是你想要的)。
如果图像数量少于十万左右,我认为你最好的选择是编写一个脚本,其功能或多或少相当于 aws s3 ls --recursive s3://some-bucket/ | perl -ne '/(\d+)_d+\.jpg$/ && print "\n"' | uniq
。
如果超过这个数,我建议使用 S3 Inventory 或 Athena 进行处理。您可以在此处找到有关如何启用 S3 清单以及使用 Athena 查询清单的说明:https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-inventory.html
设置 S3 清单后,您的查询可能如下所示:
SELECT DISTINCT regexp_extract(key, '(\d+)_\d+\.jpg', 1)
FROM the_inventory_table_name
不过,与设置 Athena 表相比,编写处理库存的脚本可能工作量更少。不过,我真的建议使用 S3 Inventory 而不是在有很多对象时直接列出 S3。
看起来您可以在 S3 中创建 S3 清单的分区文件,按日期分区:
CREATE EXTERNAL TABLE my_inventory(
`bucket` string,
key string,
version_id string,
is_latest boolean,
is_delete_marker boolean,
size bigint,
last_modified_date timestamp,
e_tag string,
storage_class string,
is_multipart_uploaded boolean,
replication_status string,
encryption_status string,
object_lock_retain_until_date timestamp,
object_lock_mode string,
object_lock_legal_hold_status string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://com.example.myorg.mybucket/com.example.myorg.mybucket/com.example.myorg.mybucket-ORC/hive/';
然后,任何时候您想查询 my_inventory
table,首先通过为当前日期创建一个新分区来修复分区文件:
MSCK REPAIR TABLE my_inventory;
最后你可以通过 PrestoDB 的 SQL-like 语法查询它:
SELECT key FROM my_inventory WHERE dt <= '<YYYY-MM-DD>-00-00';
其中 <YYYY-MM-DD>
是 YYYY-MM-DD
格式的当前日期。
然后您可以将查询结果下载为 CSV 文件并根据需要进行处理。