Kinesis Firehose Lambda 转换和动态分区
Kinesis Firehose Lambda Transformation and Dynamic partition
以下展示的数据来自faker库。我正在努力学习和实施
kinesis Firehose 中的动态分区
示例负载输入
{
"name":"Dr. Nancy Mcmillan",
"phone_numbers":"8XXXXX",
"city":"Priscillaport",
"address":"908 Mitchell Views SXXXXXXXX 42564",
"date":"1980-07-11",
"customer_id":"3"
}
示例输入代码
def main():
import boto3
import json
AWS_ACCESS_KEY = "XXXXX"
AWS_SECRET_KEY = "XXX"
AWS_REGION_NAME = "us-east-1"
for i in range(1,13):
faker = Faker()
json_data = {
"name": faker.name(),
"phone_numbers": faker.phone_number(),
"city": faker.city(),
"address": faker.address(),
"date": str(faker.date()),
"customer_id": str(random.randint(1, 5))
}
print(json_data)
hasher = MyHasher(key=json_data)
res = hasher.get()
client = boto3.client(
"kinesis",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION_NAME,
)
response = client.put_record(
StreamName='XXX',
Data=json.dumps(json_data),
PartitionKey='test',
)
print(response)
这是工作正常的 lambda 代码
try:
import json
import boto3
import base64
from dateutil import parser
except Exception as e:
pass
class MyHasher(object):
def __init__(self, key):
self.key = key
def get(self):
keys = str(self.key).encode("UTF-8")
keys = base64.b64encode(keys)
keys = keys.decode("UTF-8")
return keys
def lambda_handler(event, context):
print("Event")
print(event)
output = []
for record in event["records"]:
dat = base64.b64decode(record["data"])
serialize_payload = json.loads(dat)
print("serialize_payload", serialize_payload)
json_new_line = str(serialize_payload) + "\n"
hasherHelper = MyHasher(key=json_new_line)
hash = hasherHelper.get()
partition_keys = {"customer_id": serialize_payload.get("customer_id")}
_ = {
"recordId": record["recordId"],
"result": "Ok",
"data": hash,
'metadata': {
'partitionKeys':
partition_keys
}
}
print(_)
output.append(_)
print("*****************")
print(output)
return {"records": output}
示例屏幕截图显示工作正常
这里是动态分区的 firehose 设置
出于某些原因,我在 AWS S3 上看到一个错误文件夹,我的所有消息都进入了该文件夹
我已经成功实施了 lambda 转换并制作了一个视频,可以在下面找到我目前卡在动态分区上我尝试阅读了几篇文章,但没有帮助
https://www.youtube.com/watch?v=6wot9Z93vAY&t=231s
再次感谢你们,期待你们的回音
参考资料
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
https://www.youtube.com/watch?v=HcOVAFn-KhM
https://www.youtube.com/watch?v=PoaKgHdJgCE
https://www.amazonaws.cn/en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/
动态分区有两个前缀选项。 1) partitionKeyFromQuery
2) partitionKeyFromLambda
。如果您希望 firehose 解析记录并获取分区键,请使用第一个选项。如果要在执行转换后提供分区键,请使用第二个选项。
根据您的流水配置,您正在使用 lambda 提供分区键(第二个选项),但为第一个选项提供了前缀。要解决此问题,请禁用内联解析并将第二个选项添加到 firehose 前缀 !{partitionKeyFromLambda:customer_id}/
或删除 lambda 转换并保留内联解析
以下展示的数据来自faker库。我正在努力学习和实施 kinesis Firehose 中的动态分区
示例负载输入
{
"name":"Dr. Nancy Mcmillan",
"phone_numbers":"8XXXXX",
"city":"Priscillaport",
"address":"908 Mitchell Views SXXXXXXXX 42564",
"date":"1980-07-11",
"customer_id":"3"
}
示例输入代码
def main():
import boto3
import json
AWS_ACCESS_KEY = "XXXXX"
AWS_SECRET_KEY = "XXX"
AWS_REGION_NAME = "us-east-1"
for i in range(1,13):
faker = Faker()
json_data = {
"name": faker.name(),
"phone_numbers": faker.phone_number(),
"city": faker.city(),
"address": faker.address(),
"date": str(faker.date()),
"customer_id": str(random.randint(1, 5))
}
print(json_data)
hasher = MyHasher(key=json_data)
res = hasher.get()
client = boto3.client(
"kinesis",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION_NAME,
)
response = client.put_record(
StreamName='XXX',
Data=json.dumps(json_data),
PartitionKey='test',
)
print(response)
这是工作正常的 lambda 代码
try:
import json
import boto3
import base64
from dateutil import parser
except Exception as e:
pass
class MyHasher(object):
def __init__(self, key):
self.key = key
def get(self):
keys = str(self.key).encode("UTF-8")
keys = base64.b64encode(keys)
keys = keys.decode("UTF-8")
return keys
def lambda_handler(event, context):
print("Event")
print(event)
output = []
for record in event["records"]:
dat = base64.b64decode(record["data"])
serialize_payload = json.loads(dat)
print("serialize_payload", serialize_payload)
json_new_line = str(serialize_payload) + "\n"
hasherHelper = MyHasher(key=json_new_line)
hash = hasherHelper.get()
partition_keys = {"customer_id": serialize_payload.get("customer_id")}
_ = {
"recordId": record["recordId"],
"result": "Ok",
"data": hash,
'metadata': {
'partitionKeys':
partition_keys
}
}
print(_)
output.append(_)
print("*****************")
print(output)
return {"records": output}
示例屏幕截图显示工作正常
这里是动态分区的 firehose 设置
出于某些原因,我在 AWS S3 上看到一个错误文件夹,我的所有消息都进入了该文件夹
我已经成功实施了 lambda 转换并制作了一个视频,可以在下面找到我目前卡在动态分区上我尝试阅读了几篇文章,但没有帮助
https://www.youtube.com/watch?v=6wot9Z93vAY&t=231s
再次感谢你们,期待你们的回音
参考资料
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
https://www.youtube.com/watch?v=HcOVAFn-KhM
https://www.youtube.com/watch?v=PoaKgHdJgCE
https://www.amazonaws.cn/en/new/2021/s3-analytics-dynamic-partitioning-kinesis-data-firehose/
动态分区有两个前缀选项。 1) partitionKeyFromQuery
2) partitionKeyFromLambda
。如果您希望 firehose 解析记录并获取分区键,请使用第一个选项。如果要在执行转换后提供分区键,请使用第二个选项。
根据您的流水配置,您正在使用 lambda 提供分区键(第二个选项),但为第一个选项提供了前缀。要解决此问题,请禁用内联解析并将第二个选项添加到 firehose 前缀 !{partitionKeyFromLambda:customer_id}/
或删除 lambda 转换并保留内联解析