Kinesis Firehose 将 JSON 个对象放入 S3 中,不带分隔符逗号
Kinesis Firehose putting JSON objects in S3 without seperator comma
在发送数据之前,我正在使用 JSON.stringify 数据,它看起来像这样
{"data": [{"key1": value1, "key2": value2}, {"key1": value1, "key2": value2}]}
但是一旦它通过 AWS API 网关和 Kinesis Firehose 将它放入 S3,它看起来像这样
{
"key1": value1,
"key2": value2
}{
"key1": value1,
"key2": value2
}
JSON 对象之间的分隔符逗号消失了,但我需要它来正确处理数据。
API 网关中的模板:
#set($root = $input.path('$'))
{
"DeliveryStreamName": "some-delivery-stream",
"Records": [
#foreach($r in $root.data)
#set($data = "{
""key1"": ""$r.value1"",
""key2"": ""$r.value2""
}")
{
"Data": "$util.base64Encode($data)"
}#if($foreach.hasNext),#end
#end
]
}
我最近遇到了同样的问题,我能找到的唯一答案基本上只是在每条 JSON 消息的末尾添加换行符 ("\n") 每当您 post将它们添加到 Kinesis 流中,或者使用某种原始 JSON 解码器方法,可以处理没有分隔符的串联 JSON 对象。
我 post 编写了一个 python 代码解决方案,可以在此处的相关 Stack Overflow post 上找到:
一旦 AWS Firehose 将 JSON 对象转储到 s3,就完全可以从文件中读取单个 JSON 对象。
使用 Python 您可以使用 json
包
中的 raw_decode
函数
from json import JSONDecoder, JSONDecodeError
import re
import json
import boto3
NOT_WHITESPACE = re.compile(r'[^\s]')
def decode_stacked(document, pos=0, decoder=JSONDecoder()):
while True:
match = NOT_WHITESPACE.search(document, pos)
if not match:
return
pos = match.start()
try:
obj, pos = decoder.raw_decode(document, pos)
except JSONDecodeError:
# do something sensible if there's some error
raise
yield obj
s3 = boto3.resource('s3')
obj = s3.Object("my-bukcet", "my-firehose-json-key.json")
file_content = obj.get()['Body'].read()
for obj in decode_stacked(file_content):
print(json.dumps(obj))
# { "key1":value1,"key2":value2}
# { "key1":value1,"key2":value2}
来源:
使用 Glue / Pyspark 你可以使用
import json
rdd = sc.textFile("s3a://my-bucket/my-firehose-file-containing-json-objects")
df = rdd.map(lambda x: json.loads(x)).toDF()
df.show()
来源:
您可以考虑的一种方法是通过添加 Lambda 函数作为其数据处理器来为您的 Kinesis Firehose 传输流配置数据处理,该函数将在最终将数据传输到 S3 存储桶之前执行。
DeliveryStream:
...
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
...
BucketARN: !GetAtt MyDeliveryBucket.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt MyTransformDataLambdaFunction.Arn
Type: Lambda
...
并且在 Lambda 函数中,确保 '\n'
附加到记录的 JSON 字符串,请参阅下面的 Lambda 函数 myTransformData.ts
in Node.js:
import {
FirehoseTransformationEvent,
FirehoseTransformationEventRecord,
FirehoseTransformationHandler,
FirehoseTransformationResult,
FirehoseTransformationResultRecord,
} from 'aws-lambda';
const createDroppedRecord = (
recordId: string
): FirehoseTransformationResultRecord => {
return {
recordId,
result: 'Dropped',
data: Buffer.from('').toString('base64'),
};
};
const processData = (
payloadStr: string,
record: FirehoseTransformationEventRecord
) => {
let jsonRecord;
// ...
// Process the orginal payload,
// And create the record in JSON
return jsonRecord;
};
const transformRecord = (
record: FirehoseTransformationEventRecord
): FirehoseTransformationResultRecord => {
try {
const payloadStr = Buffer.from(record.data, 'base64').toString();
const jsonRecord = processData(payloadStr, record);
if (!jsonRecord) {
console.error('Error creating json record');
return createDroppedRecord(record.recordId);
}
return {
recordId: record.recordId,
result: 'Ok',
// Ensure that '\n' is appended to the record's JSON string.
data: Buffer.from(JSON.stringify(jsonRecord) + '\n').toString('base64'),
};
} catch (error) {
console.error('Error processing record ${record.recordId}: ', error);
return createDroppedRecord(record.recordId);
}
};
const transformRecords = (
event: FirehoseTransformationEvent
): FirehoseTransformationResult => {
let records: FirehoseTransformationResultRecord[] = [];
for (const record of event.records) {
const transformed = transformRecord(record);
records.push(transformed);
}
return { records };
};
export const handler: FirehoseTransformationHandler = async (
event,
_context
) => {
const transformed = transformRecords(event);
return transformed;
};
换行分隔符到位后,Athena 等 AWS 服务将能够正常使用 S3 存储桶中的 JSON 记录数据,而不是 just seeing the first JSON record only。
请使用此代码解决您的问题
__Author__ = "Soumil Nitin Shah"
import json
import boto3
import base64
class MyHasher(object):
def __init__(self, key):
self.key = key
def get(self):
keys = str(self.key).encode("UTF-8")
keys = base64.b64encode(keys)
keys = keys.decode("UTF-8")
return keys
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
"""Get the payload from event bridge and just get data attr """""
serialize_payload = str(json.loads(payload)) + "\n"
hasherHelper = MyHasher(key=serialize_payload)
hash = hasherHelper.get()
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': hash
}
print("output_record", output_record)
output.append(output_record)
return {'records': output}
在发送数据之前,我正在使用 JSON.stringify 数据,它看起来像这样
{"data": [{"key1": value1, "key2": value2}, {"key1": value1, "key2": value2}]}
但是一旦它通过 AWS API 网关和 Kinesis Firehose 将它放入 S3,它看起来像这样
{
"key1": value1,
"key2": value2
}{
"key1": value1,
"key2": value2
}
JSON 对象之间的分隔符逗号消失了,但我需要它来正确处理数据。
API 网关中的模板:
#set($root = $input.path('$'))
{
"DeliveryStreamName": "some-delivery-stream",
"Records": [
#foreach($r in $root.data)
#set($data = "{
""key1"": ""$r.value1"",
""key2"": ""$r.value2""
}")
{
"Data": "$util.base64Encode($data)"
}#if($foreach.hasNext),#end
#end
]
}
我最近遇到了同样的问题,我能找到的唯一答案基本上只是在每条 JSON 消息的末尾添加换行符 ("\n") 每当您 post将它们添加到 Kinesis 流中,或者使用某种原始 JSON 解码器方法,可以处理没有分隔符的串联 JSON 对象。
我 post 编写了一个 python 代码解决方案,可以在此处的相关 Stack Overflow post 上找到:
一旦 AWS Firehose 将 JSON 对象转储到 s3,就完全可以从文件中读取单个 JSON 对象。
使用 Python 您可以使用 json
包
raw_decode
函数
from json import JSONDecoder, JSONDecodeError
import re
import json
import boto3
NOT_WHITESPACE = re.compile(r'[^\s]')
def decode_stacked(document, pos=0, decoder=JSONDecoder()):
while True:
match = NOT_WHITESPACE.search(document, pos)
if not match:
return
pos = match.start()
try:
obj, pos = decoder.raw_decode(document, pos)
except JSONDecodeError:
# do something sensible if there's some error
raise
yield obj
s3 = boto3.resource('s3')
obj = s3.Object("my-bukcet", "my-firehose-json-key.json")
file_content = obj.get()['Body'].read()
for obj in decode_stacked(file_content):
print(json.dumps(obj))
# { "key1":value1,"key2":value2}
# { "key1":value1,"key2":value2}
来源:
使用 Glue / Pyspark 你可以使用
import json
rdd = sc.textFile("s3a://my-bucket/my-firehose-file-containing-json-objects")
df = rdd.map(lambda x: json.loads(x)).toDF()
df.show()
来源:
您可以考虑的一种方法是通过添加 Lambda 函数作为其数据处理器来为您的 Kinesis Firehose 传输流配置数据处理,该函数将在最终将数据传输到 S3 存储桶之前执行。
DeliveryStream:
...
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
...
BucketARN: !GetAtt MyDeliveryBucket.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt MyTransformDataLambdaFunction.Arn
Type: Lambda
...
并且在 Lambda 函数中,确保 '\n'
附加到记录的 JSON 字符串,请参阅下面的 Lambda 函数 myTransformData.ts
in Node.js:
import {
FirehoseTransformationEvent,
FirehoseTransformationEventRecord,
FirehoseTransformationHandler,
FirehoseTransformationResult,
FirehoseTransformationResultRecord,
} from 'aws-lambda';
const createDroppedRecord = (
recordId: string
): FirehoseTransformationResultRecord => {
return {
recordId,
result: 'Dropped',
data: Buffer.from('').toString('base64'),
};
};
const processData = (
payloadStr: string,
record: FirehoseTransformationEventRecord
) => {
let jsonRecord;
// ...
// Process the orginal payload,
// And create the record in JSON
return jsonRecord;
};
const transformRecord = (
record: FirehoseTransformationEventRecord
): FirehoseTransformationResultRecord => {
try {
const payloadStr = Buffer.from(record.data, 'base64').toString();
const jsonRecord = processData(payloadStr, record);
if (!jsonRecord) {
console.error('Error creating json record');
return createDroppedRecord(record.recordId);
}
return {
recordId: record.recordId,
result: 'Ok',
// Ensure that '\n' is appended to the record's JSON string.
data: Buffer.from(JSON.stringify(jsonRecord) + '\n').toString('base64'),
};
} catch (error) {
console.error('Error processing record ${record.recordId}: ', error);
return createDroppedRecord(record.recordId);
}
};
const transformRecords = (
event: FirehoseTransformationEvent
): FirehoseTransformationResult => {
let records: FirehoseTransformationResultRecord[] = [];
for (const record of event.records) {
const transformed = transformRecord(record);
records.push(transformed);
}
return { records };
};
export const handler: FirehoseTransformationHandler = async (
event,
_context
) => {
const transformed = transformRecords(event);
return transformed;
};
换行分隔符到位后,Athena 等 AWS 服务将能够正常使用 S3 存储桶中的 JSON 记录数据,而不是 just seeing the first JSON record only。
请使用此代码解决您的问题
__Author__ = "Soumil Nitin Shah"
import json
import boto3
import base64
class MyHasher(object):
def __init__(self, key):
self.key = key
def get(self):
keys = str(self.key).encode("UTF-8")
keys = base64.b64encode(keys)
keys = keys.decode("UTF-8")
return keys
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
"""Get the payload from event bridge and just get data attr """""
serialize_payload = str(json.loads(payload)) + "\n"
hasherHelper = MyHasher(key=serialize_payload)
hash = hasherHelper.get()
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': hash
}
print("output_record", output_record)
output.append(output_record)
return {'records': output}