从 kinesis firehose 解析 json

parse json from kinesis firehose

您好,我正在尝试将 kinesis firehose 与 S3 结合使用。我试着阅读那些 s3 文件。我正在使用 GO 阅读它。

但是,我无法解析 JSON,因为这些值只是附加的,没有任何定界符。

这是文件示例(请注意,原始输入是相互附加的,出于格式化目的,我用换行符将它们分开):

{"ticker_symbol":"PLM","sector":"FINANCIAL","change":-0.16,"price":19.99}
{"ticker_symbol":"AZL","sector":"HEALTHCARE","change":-0.78,"price":16.51}
{"ticker_symbol":"IOP","sector":"TECHNOLOGY","change":-1.98,"price":121.88}
{"ticker_symbol":"VVY","sector":"HEALTHCARE","change":-0.56,"price":47.62}
{"ticker_symbol":"BFH","sector":"RETAIL","change":0.74,"price":16.61}
{"ticker_symbol":"WAS","sector":"RETAIL","change":-0.6,"price":16.72}

我的问题是,如何在 Go 中解析它?我能想到的一种解决方案是将它们拆分 }{ 并再次附加它们。但这很老套。

或者 kinesis firehose 是否提供定界符?

------更新-----

目前我已经实施了解决方案,将所有 }{ 替换为 },{,然后在开头添加 [,在结尾添加 ]。然后解析它。

但是我仍在寻找替代方案,因为此解决方案会限制 json 对象

内容中的任何 }{

创建一个简单的结构来解组 json 分批进来。所以每个批次 json 都被解组到一个 json 对象中。然后创建一个结构切片以将解析的 json 附加到切片中。这会将结果 json 全部附加到结构的切片中。

package main

import (
    "encoding/json"
    "fmt"
)

type Ticker struct {
    TickerSymbol string  `json:"ticker_symbol"`
    Sector       string  `json:"sector"`
    Change       float64 `json:"change"`
    Price        float64 `json:"price"`
}

var jsonBytes = []byte(`{"ticker_symbol":"PLM","sector":"FINANCIAL","change":-0.16,"price":19.99}`)

func main() {
    var singleResult Ticker
    var result []Ticker
    if err := json.Unmarshal(jsonBytes, &singleResult); err != nil {
        fmt.Println(err)
    }

    if len(result) == 0 {
        result = append(result, singleResult)
    }
    fmt.Printf("%+v", result)
}

已编辑:

如果数据是批量传入的,其中包含 json 个相互附加的对象,那么您可以使用正则表达式将 } 替换为 }, 然后 trim 最右边 , 使有效的 json 对象数组为:

package main

import (
    "fmt"
    "regexp"
    "strings"
)

type Ticker struct {
    TickerSymbol string  `json:"ticker_symbol"`
    Sector       string  `json:"sector"`
    Change       float64 `json:"change"`
    Price        float64 `json:"price"`
}

var str = `{"ticker_symbol":"PLM","sector":"FINANCIAL","change":-0.16,"price":19.99}
{"ticker_symbol":"AZL","sector":"HEALTHCARE","change":-0.78,"price":16.51}
{"ticker_symbol":"IOP","sector":"TECHNOLOGY","change":-1.98,"price":121.88}
{"ticker_symbol":"VVY","sector":"HEALTHCARE","change":-0.56,"price":47.62}
{"ticker_symbol":"BFH","sector":"RETAIL","change":0.74,"price":16.61}
{"ticker_symbol":"WAS","sector":"RETAIL","change":-0.6,"price":16.72}`

func main() {

    r := regexp.MustCompile("}")
    output := strings.TrimRight(r.ReplaceAllString(str, "},"), ",")
    output = fmt.Sprintf("[%s]", output)
    fmt.Println(output)
}

使用 r := regexp.MustCompile("}") 将帮助您不必担心 }{ 之间的空格会干扰替换字符串。所以只需将 } 替换为 }, 然后 trim 即可。

此外,我使用 MustCompile 的原因是:

When creating constants with regular expressions you can use the MustCompile variation of Compile. A plain Compile won’t work for constants because it has 2 return values.

json 在 Go playground

上解析的完整工作代码