如何读取无效的 JSON 格式亚马逊 firehose

How to read invalid JSON format amazon firehose

我遇到了这个最可怕的场景,我想读取 kinesis firehose 在我们的 S3 上创建的文件。

Kinesis firehose 创建的文件并不是每个 json 对象都在一个新行上,而是一个 json 对象串联文件。

{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}

现在这是正常 JSON.parse 不支持的场景,我尝试使用以下正则表达式:.scan(/({((\".?\":.?)*?)})/)

但扫描似乎只适用于没有嵌套括号的场景。

有人知道working/better/more解决这个问题的优雅方法吗?

最初的答案中的那个是针对不带引号的 json 的,这种情况有时会发生。这个:

({((\?\".*?\?\")*?)})

适用于引用的 json 和未引用的 json

除此之外对其进行了一些改进,以使其更简单..因为您可以使用整数和普通值..由于双重捕获组,字符串文字中的任何内容都将被忽略。

https://regex101.com/r/kPSc0i/1

将输入修改为一个大 JSON 数组,然后解析:

input = File.read("input.json")
json = "[#{input.rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)

您可能希望合并前两个以节省一些内存:

json = "[#{File.read('input.json').rstrip.gsub(/\}\s*\{/, '},{')}]"
data = JSON.parse(json)

这假设 } 后跟一些空格后跟 { 永远不会出现在 JSON 编码数据的键或值中。

正如您在最近的评论中总结的那样,firehose 中的 put_records_batch 要求您在记录中手动放置分隔符,以便消费者轻松解析。您可以添加一个新行或一些仅用于解析的特殊字符,例如 %,它永远不应该在您的有效负载中使用。

其他选项是逐条发送记录。这仅在您的用例不需要高吞吐量时才可行。为此,您可以在每条记录上循环并加载为字符串化数据 blob。如果在 Python 中完成,我们将有一个包含所有 json 对象的字典 "records"。

import json
def send_to_firehose(records):
  firehose_client = boto3.client('firehose')
  for record in records:
    data = json.dumps(record)
    firehose_client.put_record(DeliveryStreamName=<your stream>,
                               Record={
                                       'Data': data
                                      }
                              )

默认情况下,Firehose 在将数据发送到您的存储桶之前会缓冲数据,它应该以这样的形式结束。这将很容易以您喜欢的数据结构解析和加载到内存中。

[
    {
        "metadata": {
            "schema_id": "4096"
        },
        "payload": {
            "zaza": 12,
            "price": 20,
            "message": "Testing sendnig the data in message attribute",
            "source": "coming routing to firehose"
        }
    },
    {
        "metadata": {
            "schema_id": "4096"
        },
        "payload": {
            "zaza": 12,
            "price": 20,
            "message": "Testing sendnig the data in message attribute",
            "source": "coming routing to firehose"
        }
    }
]