Kinesis Firehose Lambda 转换和动态分区

Kinesis Firehose Lambda Transformation and Dynamic partition

以下展示的数据来自faker库。我正在努力学习和实施 kinesis Firehose 中的动态分区

示例负载输入

{
   "name":"Dr. Nancy Mcmillan",
   "phone_numbers":"8XXXXX",
   "city":"Priscillaport",
   "address":"908 Mitchell Views SXXXXXXXX 42564",
   "date":"1980-07-11",
   "customer_id":"3"
}

示例输入代码

def main():
    
    import boto3
    import json
    
    AWS_ACCESS_KEY = "XXXXX"
    AWS_SECRET_KEY = "XXX"
    AWS_REGION_NAME = "us-east-1"

    for i in range(1,13):
        faker = Faker()
        json_data = {
            "name": faker.name(),
            "phone_numbers": faker.phone_number(),
            "city": faker.city(),
            "address": faker.address(),
            "date": str(faker.date()),
            "customer_id": str(random.randint(1, 5))
        }
        print(json_data)
        hasher = MyHasher(key=json_data)
        res = hasher.get()

        client = boto3.client(
            "kinesis",
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY,
            region_name=AWS_REGION_NAME,
        )

        response = client.put_record(
            StreamName='XXX',
            Data=json.dumps(json_data),
            PartitionKey='test',
        )

        print(response)

这是工作正常的 lambda 代码

try:
    import json
    import boto3
    import base64
    from dateutil import parser
except Exception as e:
    pass

class MyHasher(object):
    def __init__(self, key):
        self.key = key

    def get(self):
        keys = str(self.key).encode("UTF-8")
        keys = base64.b64encode(keys)
        keys = keys.decode("UTF-8")
        return keys


def lambda_handler(event, context):
    print("Event")
    print(event)

    output = []

    for record in event["records"]:
        dat = base64.b64decode(record["data"])
        serialize_payload = json.loads(dat)
        print("serialize_payload", serialize_payload)

        json_new_line = str(serialize_payload) + "\n"

        hasherHelper = MyHasher(key=json_new_line)
        hash = hasherHelper.get()

        partition_keys = {"customer_id": serialize_payload.get("customer_id")}

        _ = {
             "recordId": record["recordId"],
             "result": "Ok",
             "data": hash,
             'metadata': {
                 'partitionKeys':
                     partition_keys
             }
             }

        print(_)

        output.append(_)
    print("*****************")
    print(output)
    return {"records": output}





示例屏幕截图显示工作正常

这里是动态分区的 firehose 设置

出于某些原因,我在 AWS S3 上看到一个错误文件夹,我的所有消息都进入了该文件夹

我已经成功实施了 lambda 转换并制作了一个视频,可以在下面找到我目前卡在动态分区上我尝试阅读了几篇文章,但没有帮助

https://www.youtube.com/watch?v=6wot9Z93vAY&t=231s

再次感谢你们,期待你们的回音

参考资料

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

https://www.youtube.com/watch?v=HcOVAFn-KhM

https://www.youtube.com/watch?v=PoaKgHdJgCE

https://medium.com/@bv_subhash/kinesis-firehose-performs-partitioning-based-on-timestamps-and-creates-files-in-s3-but-they-would-13efd51f6d39

https://www.amazonaws.cn/en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/

动态分区有两个前缀选项。 1) partitionKeyFromQuery 2) partitionKeyFromLambda。如果您希望 firehose 解析记录并获取分区键,请使用第一个选项。如果要在执行转换后提供分区键,请使用第二个选项。 根据您的流水配置,您正在使用 lambda 提供分区键(第二个选项),但为第一个选项提供了前缀。要解决此问题,请禁用内联解析并将第二个选项添加到 firehose 前缀 !{partitionKeyFromLambda:customer_id}/ 或删除 lambda 转换并保留内联解析