AWS Athena:跨账户写入 CTAS 查询结果
AWS Athena: cross account write of CTAS query result
我在 帐户 A 中有大量历史数据集。此数据集为 csv 格式,按 year/month/day/hour/
分区。我的目标是通过额外的规范化步骤和额外的分区级别将这些数据转换为镶木地板,例如year/month/day/hour/product/
,写回processed/
“目录”下的账号A同一个桶。所以“目录”树看起来像
S3_bucket_Account_A
dataset
|
├── raw
│ ├── year=2017
| │ ├── month=01
| | │ ├── day=01
| │ | | ├── hour=00
| │ | | └── hour=01
|
├── processed
│ ├── year=2017
| │ ├── month=01
| | │ ├── day=01
| | | │ ├── hour=00
| | │ | | ├── product=A
| | │ | | └── product=B
| | | │ ├── hour=01
| | │ | | ├── product=A
| | │ | | └── product=B
为此,我使用 boto3 API 向 Athena 发送 CTAS 查询语句。我知道 limitations of CTAS queries,例如在同一个查询中最多可以写入 100 个分区,CTAS 查询结果的位置必须是 empty/unique。因此,考虑到这些限制,我同时处理一个原始分区并即时生成 CTAS 查询的内容。
由于我使用账户 B 执行这些 CTAS 查询,但这些查询的结果应该写入账户 A拥有的 S3 存储桶[=80] =].我已获得以下权限,这些权限是在帐户 A 的存储桶策略级别指定的。
{
"Effect": "Allow",
"Principal": {
"AWS": "__ARN_OF_ACCOUNT_B__"
},
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::dataset",
"arn:aws:s3:::dataset/*"
]
}
问题是账户 A(存储桶拥有者)无权访问因账户 B 的 Athena 执行的 CTAS 查询而写入的文件。
据我了解,有一个选项是账户A为我创建一个IAM角色,然后我会像账户A一样执行这个任务。但不幸的是,这个选项是不可能的。
我找到了如何转移 S3 object 的 ownership/change ACL 的方法。一种方法是将 CTAS 查询结果输出到账户 B 的 S3 存储桶中,然后将这些文件复制到账户 A 的存储桶中 (original source)
aws s3 cp s3://source_awsexamplebucket/ s3://destination_awsexamplebucket/ --acl bucket-owner-full-control --recursive
另一种方法是使用类似 ()
的方式递归更新 acl
aws s3 ls s3://bucket/path/ --recursive | awk '{cmd="aws s3api put-object-acl --acl bucket-owner-full-control --bucket bucket --key "; system(cmd)}'
但这两个选项需要向 S3 发出额外的 GET
和 PUT
请求,因此需要更多的钱来支付 AWS。但更重要的是,在 CTAS 查询成功后,我使用创建的 table 中的分区更新 帐户 A 的 AWS Glue table(目标 table)。这样,账户 A 中的 IAM 用户可以立即开始查询转换后的数据。这是我如何更新 destination_table
的大致思路
response = glue_client.get_partitions(
CatalogId="__ACCOUNT_B_ID__",
DatabaseName="some_database_in_account_B",
TableName="ctas_table"
)
for partition in response["Partitions"]:
for key in ["DatabaseName", "TableName", "CreationTime"]:
partition.pop(key)
glue_client.batch_create_partition(
CatalogId="__ACCOUNT_A_ID__",
DatabaseName="some_database_in_account_A",
TableName="destination_table",
PartitionInputList=response["Partitions"]
)
我用这种方式而不是MSCK REPAIR TABLE destination_table
,因为后者由于某种原因需要很长时间。如您所见,如果我选择使用 aws s3 cp
,我在复制有关分区的元信息时也需要考虑到这一点
所以我真正的问题是如何在另一个帐户执行的 CTAS 查询中向存储桶的所有者授予完全控制权?
2019-06-25更新:
刚找到 similar post,但他们似乎使用 IAM 角色,这不是我的选择
2019-06-27更新
我发现:1) 无法在 CTAS 查询中更改 ACL。相反,S3 object 可以通过新的所有权复制到自身(感谢 John Rotenstein and Theo 的评论)。
更新2019-06-30
只是回顾一下。我 运行 来自 account B
的 CTAS 查询,但结果保存在 account A
拥有的存储桶中。这是 CTAS 查询“header”的样子:
CREATE TABLE some_database_in_account_B.ctas_table
WITH (
format = 'PARQUET',
external_location = 's3://__destination_bucket_in_Account_A__/__CTAS_prefix__/',
partitioned_by = ARRAY['year', 'month', 'day', 'hour', 'product']
) AS (
...
...
)
由于我使用 boto3
提交 CTAS 查询并且我知道 __destination_bucket_in_Account_A__
和 __CTAS_prefix__
,所以我可以直接更改而不是使用 aws cp
自己复制文件成功执行 CTAS 查询后,他们在同一 python 脚本中的 ACL。
s3_resource = aws_session.resource('s3')
destination_bucket = s3_resource.Bucket(name="__destination_bucket_in_Account_A__")
for obj in destination_bucket.objects.filter(Prefix="__CTAS_prefix__"):
object_acl = s3_resource.ObjectAcl(destination_bucket.name, obj.key)
object_acl.put(
ACL='bucket-owner-full-control'
)
注意,因为我需要提交一些超出 AWS Athena 限制的 CTAS 查询,我已经实现了自动提交新查询并执行一些额外操作的逻辑,例如更新目标 Glue table 和日志记录。因此,包括这些代码行非常简单。
我建议您执行复制。
"additional GET and PUT requests" 会比较小:
- GET 费用为每 1000 个请求 0.0004 USD
- PUT 为每 1000 个请求 0.005 USD
或者,您 运行 从帐户 B 发出 aws s3 cp --recursive
命令,将文件复制到自己(是的!)并更改所有权(它还需要另一个更改,例如将元数据设置为接受为复制命令)。这类似于您在 put-object-acl
.
中提出的建议
目前,干净地执行此操作的唯一方法是在账户 A 中使用 IAM 角色,并使用允许账户 B 承担该角色的信任策略。您提到这对您的情况是不可能的,这很不幸。目前无法以任何其他方式实现的原因是 Athena 不会使用 "bucket-owner-full-control" 选项写入文件,因此账户 A 永远不会完全拥有由账户 B 中的角色发起的操作创建的任何文件。
由于在目标存储桶中授予的策略允许所有内容,因此您可以做的一件事是 运行 在 CTAS 操作完成后执行一项任务,列出创建的对象并将每个对象复制到自身(相同源键和目标键)与 "bucket-owner-full-control" ACL 选项。像这样复制对象是更改 S3 对象的存储和 ACL 属性的常用方法。正如您所说,这将产生额外费用,但与 CTAS 费用以及与未来数据查询相关的费用相比,这些费用微不足道。
真正的缺点是必须在 CTAS 操作后向 运行 写入一些内容,并进行协调。我建议查看 Step Functions 来执行此操作,您可以创建非常好的工作流程来自动化 Athena,而且 运行 的成本非常低。我有一些应用程序或多或少与您尝试做的完全相同,它们使用 Step Functions、Lambda 和 Athena 并且成本低廉(不过我使用 IAM 角色进行跨账户工作)。
我在 帐户 A 中有大量历史数据集。此数据集为 csv 格式,按 year/month/day/hour/
分区。我的目标是通过额外的规范化步骤和额外的分区级别将这些数据转换为镶木地板,例如year/month/day/hour/product/
,写回processed/
“目录”下的账号A同一个桶。所以“目录”树看起来像
S3_bucket_Account_A
dataset
|
├── raw
│ ├── year=2017
| │ ├── month=01
| | │ ├── day=01
| │ | | ├── hour=00
| │ | | └── hour=01
|
├── processed
│ ├── year=2017
| │ ├── month=01
| | │ ├── day=01
| | | │ ├── hour=00
| | │ | | ├── product=A
| | │ | | └── product=B
| | | │ ├── hour=01
| | │ | | ├── product=A
| | │ | | └── product=B
为此,我使用 boto3 API 向 Athena 发送 CTAS 查询语句。我知道 limitations of CTAS queries,例如在同一个查询中最多可以写入 100 个分区,CTAS 查询结果的位置必须是 empty/unique。因此,考虑到这些限制,我同时处理一个原始分区并即时生成 CTAS 查询的内容。
由于我使用账户 B 执行这些 CTAS 查询,但这些查询的结果应该写入账户 A拥有的 S3 存储桶[=80] =].我已获得以下权限,这些权限是在帐户 A 的存储桶策略级别指定的。
{
"Effect": "Allow",
"Principal": {
"AWS": "__ARN_OF_ACCOUNT_B__"
},
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::dataset",
"arn:aws:s3:::dataset/*"
]
}
问题是账户 A(存储桶拥有者)无权访问因账户 B 的 Athena 执行的 CTAS 查询而写入的文件。
据我了解,有一个选项是账户A为我创建一个IAM角色,然后我会像账户A一样执行这个任务。但不幸的是,这个选项是不可能的。
我找到了如何转移 S3 object 的 ownership/change ACL 的方法。一种方法是将 CTAS 查询结果输出到账户 B 的 S3 存储桶中,然后将这些文件复制到账户 A 的存储桶中 (original source)
aws s3 cp s3://source_awsexamplebucket/ s3://destination_awsexamplebucket/ --acl bucket-owner-full-control --recursive
另一种方法是使用类似 (
aws s3 ls s3://bucket/path/ --recursive | awk '{cmd="aws s3api put-object-acl --acl bucket-owner-full-control --bucket bucket --key "; system(cmd)}'
但这两个选项需要向 S3 发出额外的 GET
和 PUT
请求,因此需要更多的钱来支付 AWS。但更重要的是,在 CTAS 查询成功后,我使用创建的 table 中的分区更新 帐户 A 的 AWS Glue table(目标 table)。这样,账户 A 中的 IAM 用户可以立即开始查询转换后的数据。这是我如何更新 destination_table
response = glue_client.get_partitions(
CatalogId="__ACCOUNT_B_ID__",
DatabaseName="some_database_in_account_B",
TableName="ctas_table"
)
for partition in response["Partitions"]:
for key in ["DatabaseName", "TableName", "CreationTime"]:
partition.pop(key)
glue_client.batch_create_partition(
CatalogId="__ACCOUNT_A_ID__",
DatabaseName="some_database_in_account_A",
TableName="destination_table",
PartitionInputList=response["Partitions"]
)
我用这种方式而不是MSCK REPAIR TABLE destination_table
,因为后者由于某种原因需要很长时间。如您所见,如果我选择使用 aws s3 cp
,我在复制有关分区的元信息时也需要考虑到这一点
所以我真正的问题是如何在另一个帐户执行的 CTAS 查询中向存储桶的所有者授予完全控制权?
2019-06-25更新:
刚找到 similar post,但他们似乎使用 IAM 角色,这不是我的选择
2019-06-27更新
我发现:1) 无法在 CTAS 查询中更改 ACL。相反,S3 object 可以通过新的所有权复制到自身(感谢 John Rotenstein and Theo 的评论)。
更新2019-06-30
只是回顾一下。我 运行 来自 account B
的 CTAS 查询,但结果保存在 account A
拥有的存储桶中。这是 CTAS 查询“header”的样子:
CREATE TABLE some_database_in_account_B.ctas_table
WITH (
format = 'PARQUET',
external_location = 's3://__destination_bucket_in_Account_A__/__CTAS_prefix__/',
partitioned_by = ARRAY['year', 'month', 'day', 'hour', 'product']
) AS (
...
...
)
由于我使用 boto3
提交 CTAS 查询并且我知道 __destination_bucket_in_Account_A__
和 __CTAS_prefix__
,所以我可以直接更改而不是使用 aws cp
自己复制文件成功执行 CTAS 查询后,他们在同一 python 脚本中的 ACL。
s3_resource = aws_session.resource('s3')
destination_bucket = s3_resource.Bucket(name="__destination_bucket_in_Account_A__")
for obj in destination_bucket.objects.filter(Prefix="__CTAS_prefix__"):
object_acl = s3_resource.ObjectAcl(destination_bucket.name, obj.key)
object_acl.put(
ACL='bucket-owner-full-control'
)
注意,因为我需要提交一些超出 AWS Athena 限制的 CTAS 查询,我已经实现了自动提交新查询并执行一些额外操作的逻辑,例如更新目标 Glue table 和日志记录。因此,包括这些代码行非常简单。
我建议您执行复制。
"additional GET and PUT requests" 会比较小:
- GET 费用为每 1000 个请求 0.0004 USD
- PUT 为每 1000 个请求 0.005 USD
或者,您 运行 从帐户 B 发出 aws s3 cp --recursive
命令,将文件复制到自己(是的!)并更改所有权(它还需要另一个更改,例如将元数据设置为接受为复制命令)。这类似于您在 put-object-acl
.
目前,干净地执行此操作的唯一方法是在账户 A 中使用 IAM 角色,并使用允许账户 B 承担该角色的信任策略。您提到这对您的情况是不可能的,这很不幸。目前无法以任何其他方式实现的原因是 Athena 不会使用 "bucket-owner-full-control" 选项写入文件,因此账户 A 永远不会完全拥有由账户 B 中的角色发起的操作创建的任何文件。
由于在目标存储桶中授予的策略允许所有内容,因此您可以做的一件事是 运行 在 CTAS 操作完成后执行一项任务,列出创建的对象并将每个对象复制到自身(相同源键和目标键)与 "bucket-owner-full-control" ACL 选项。像这样复制对象是更改 S3 对象的存储和 ACL 属性的常用方法。正如您所说,这将产生额外费用,但与 CTAS 费用以及与未来数据查询相关的费用相比,这些费用微不足道。
真正的缺点是必须在 CTAS 操作后向 运行 写入一些内容,并进行协调。我建议查看 Step Functions 来执行此操作,您可以创建非常好的工作流程来自动化 Athena,而且 运行 的成本非常低。我有一些应用程序或多或少与您尝试做的完全相同,它们使用 Step Functions、Lambda 和 Athena 并且成本低廉(不过我使用 IAM 角色进行跨账户工作)。