如何读取无效的 JSON 格式亚马逊 firehose
How to read invalid JSON format amazon firehose
我遇到了这个最可怕的场景,我想读取 kinesis firehose 在我们的 S3 上创建的文件。
Kinesis firehose 创建的文件并不是每个 json 对象都在一个新行上,而是一个 json 对象串联文件。
{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}
现在这是正常 JSON.parse 不支持的场景,我尝试使用以下正则表达式:.scan(/({((\".?\":.?)*?)})/)
但扫描似乎只适用于没有嵌套括号的场景。
有人知道working/better/more解决这个问题的优雅方法吗?
最初的答案中的那个是针对不带引号的 json 的,这种情况有时会发生。这个:
({((\?\".*?\?\")*?)})
适用于引用的 json 和未引用的 json
除此之外对其进行了一些改进,以使其更简单..因为您可以使用整数和普通值..由于双重捕获组,字符串文字中的任何内容都将被忽略。
将输入修改为一个大 JSON 数组,然后解析:
input = File.read("input.json")
json = "[#{input.rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)
您可能希望合并前两个以节省一些内存:
json = "[#{File.read('input.json').rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)
这假设 }
后跟一些空格后跟 {
永远不会出现在 JSON 编码数据的键或值中。
正如您在最近的评论中总结的那样,firehose 中的 put_records_batch 要求您在记录中手动放置分隔符,以便消费者轻松解析。您可以添加一个新行或一些仅用于解析的特殊字符,例如 %,它永远不应该在您的有效负载中使用。
其他选项是逐条发送记录。这仅在您的用例不需要高吞吐量时才可行。为此,您可以在每条记录上循环并加载为字符串化数据 blob。如果在 Python 中完成,我们将有一个包含所有 json 对象的字典 "records"。
import json
def send_to_firehose(records):
firehose_client = boto3.client('firehose')
for record in records:
data = json.dumps(record)
firehose_client.put_record(DeliveryStreamName=<your stream>,
Record={
'Data': data
}
)
默认情况下,Firehose 在将数据发送到您的存储桶之前会缓冲数据,它应该以这样的形式结束。这将很容易以您喜欢的数据结构解析和加载到内存中。
[
{
"metadata": {
"schema_id": "4096"
},
"payload": {
"zaza": 12,
"price": 20,
"message": "Testing sendnig the data in message attribute",
"source": "coming routing to firehose"
}
},
{
"metadata": {
"schema_id": "4096"
},
"payload": {
"zaza": 12,
"price": 20,
"message": "Testing sendnig the data in message attribute",
"source": "coming routing to firehose"
}
}
]
我遇到了这个最可怕的场景,我想读取 kinesis firehose 在我们的 S3 上创建的文件。
Kinesis firehose 创建的文件并不是每个 json 对象都在一个新行上,而是一个 json 对象串联文件。
{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}
现在这是正常 JSON.parse 不支持的场景,我尝试使用以下正则表达式:.scan(/({((\".?\":.?)*?)})/)
但扫描似乎只适用于没有嵌套括号的场景。
有人知道working/better/more解决这个问题的优雅方法吗?
最初的答案中的那个是针对不带引号的 json 的,这种情况有时会发生。这个:
({((\?\".*?\?\")*?)})
适用于引用的 json 和未引用的 json
除此之外对其进行了一些改进,以使其更简单..因为您可以使用整数和普通值..由于双重捕获组,字符串文字中的任何内容都将被忽略。
将输入修改为一个大 JSON 数组,然后解析:
input = File.read("input.json")
json = "[#{input.rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)
您可能希望合并前两个以节省一些内存:
json = "[#{File.read('input.json').rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)
这假设 }
后跟一些空格后跟 {
永远不会出现在 JSON 编码数据的键或值中。
正如您在最近的评论中总结的那样,firehose 中的 put_records_batch 要求您在记录中手动放置分隔符,以便消费者轻松解析。您可以添加一个新行或一些仅用于解析的特殊字符,例如 %,它永远不应该在您的有效负载中使用。
其他选项是逐条发送记录。这仅在您的用例不需要高吞吐量时才可行。为此,您可以在每条记录上循环并加载为字符串化数据 blob。如果在 Python 中完成,我们将有一个包含所有 json 对象的字典 "records"。
import json
def send_to_firehose(records):
firehose_client = boto3.client('firehose')
for record in records:
data = json.dumps(record)
firehose_client.put_record(DeliveryStreamName=<your stream>,
Record={
'Data': data
}
)
默认情况下,Firehose 在将数据发送到您的存储桶之前会缓冲数据,它应该以这样的形式结束。这将很容易以您喜欢的数据结构解析和加载到内存中。
[
{
"metadata": {
"schema_id": "4096"
},
"payload": {
"zaza": 12,
"price": 20,
"message": "Testing sendnig the data in message attribute",
"source": "coming routing to firehose"
}
},
{
"metadata": {
"schema_id": "4096"
},
"payload": {
"zaza": 12,
"price": 20,
"message": "Testing sendnig the data in message attribute",
"source": "coming routing to firehose"
}
}
]