如何为 S3 上的 AWS Firehose 设置目标文件名?
How can I set the destination filename for AWS Firehose on S3?
我正在处理添加到 S3 的 XML 文件并将结果写入流水线,并将结果存储在同一个 S3 存储桶中,但目标文件名必须采用特定格式。我检查了文档,但看不到任何设置文件名格式的方法。
我能找到的最接近的是 firehose FAQ
Q: What is the naming pattern of the Amazon S3 objects delivered by Amazon Kinesis Data Firehose?
The Amazon S3 object name follows the pattern DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString, where DeliveryStreamVersion begins with 1 and increases by 1 for every configuration change of the delivery stream. You can change delivery stream configurations (for example, the name of the S3 bucket, buffering hints, compression, and encryption) with the Firehose Console or the UpdateDestination operation.
如果您使用静态命名,您可以通过 Firehose 控制台或 UpdateDestination 操作指定它。
但是如果您正在寻找一些动态命名,不幸的是,目前这是不可能的。详细回答请参考这个问题——
Storing Firehose transfered files in S3 under custom directory names
我也对无法动态指定文件名感到不满意,因此我创建了一个 lambda 函数来重命名我的 Kinesis 流输出的文件。这些是我采取的步骤
- 我在我的 Kinesis 数据中包含了我想要的文件名。
- 我创建了一个新的 lambda 函数,每当 kinesis 输出文件时设置为 运行。
- 我的 lambda 函数:
- 打开我的文件
- 抓取新文件名
- 创建新文件
- 删除命名错误的旧文件。
import boto3
import json
def lambda_handler(event, context):
key = event["Records"][0]["s3"]["object"]["key"]
bucket=event["Records"][0]["s3"]["bucket"]["name"]
s3resource = boto3.resource('s3')
obj = s3resource.Object(bucket, key)
body = obj.get()['Body'].read()
dic = json.loads(body)
my_new_file_name= dic["my_new_file_name"]
s3resource.Object(bucket, str(my_new_file_name).copy_from(CopySource=f'{bucket}/{key}')
s3resource.Object(bucket, key).delete()
我正在处理添加到 S3 的 XML 文件并将结果写入流水线,并将结果存储在同一个 S3 存储桶中,但目标文件名必须采用特定格式。我检查了文档,但看不到任何设置文件名格式的方法。 我能找到的最接近的是 firehose FAQ
Q: What is the naming pattern of the Amazon S3 objects delivered by Amazon Kinesis Data Firehose?
The Amazon S3 object name follows the pattern DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString, where DeliveryStreamVersion begins with 1 and increases by 1 for every configuration change of the delivery stream. You can change delivery stream configurations (for example, the name of the S3 bucket, buffering hints, compression, and encryption) with the Firehose Console or the UpdateDestination operation.
如果您使用静态命名,您可以通过 Firehose 控制台或 UpdateDestination 操作指定它。
但是如果您正在寻找一些动态命名,不幸的是,目前这是不可能的。详细回答请参考这个问题—— Storing Firehose transfered files in S3 under custom directory names
我也对无法动态指定文件名感到不满意,因此我创建了一个 lambda 函数来重命名我的 Kinesis 流输出的文件。这些是我采取的步骤
- 我在我的 Kinesis 数据中包含了我想要的文件名。
- 我创建了一个新的 lambda 函数,每当 kinesis 输出文件时设置为 运行。
- 我的 lambda 函数:
- 打开我的文件
- 抓取新文件名
- 创建新文件
- 删除命名错误的旧文件。
import boto3
import json
def lambda_handler(event, context):
key = event["Records"][0]["s3"]["object"]["key"]
bucket=event["Records"][0]["s3"]["bucket"]["name"]
s3resource = boto3.resource('s3')
obj = s3resource.Object(bucket, key)
body = obj.get()['Body'].read()
dic = json.loads(body)
my_new_file_name= dic["my_new_file_name"]
s3resource.Object(bucket, str(my_new_file_name).copy_from(CopySource=f'{bucket}/{key}')
s3resource.Object(bucket, key).delete()