无法从 Kinesis 流解码 CloudWatch Logs

Unable to decode CloudWatch Logs from Kinesis stream

我试图通过 Kinesis“汇集”我的 cloudwatch 日志,然后到 lambda 进行处理,但是我找不到 decode/parse 传入日志的方法。 到目前为止我试过这个:

方法一使用cloudwatch“class”

func function(request events.KinesisEvent) error {
    for _, record := range request.Records {
        fmt.Println(record.EventName)
        fmt.Println(string(record.Kinesis.Data))

        rawData := events.CloudwatchLogsRawData{
            Data: string(record.Kinesis.Data),
        }

        parse, err := rawData.Parse()
        fmt.Println(parse)
        fmt.Println(err)
    }
    return nil
}

func main() {
    lambda.Start(function)
}

方法二手动解码

var logData events.CloudwatchLogsData

func Base64Decode(message []byte) (b []byte, err error) {
    var l int
    b = make([]byte, base64.StdEncoding.DecodedLen(len(message)))
    l, err = base64.StdEncoding.Decode(b, message)
    if err != nil {
        return
    }
    return b[:l], nil
}

func Parse(rawData []byte, d events.CloudwatchLogsData) (err error) {
    data, err := Base64Decode(rawData)
    if err != nil {
        return
    }

    zr, err := gzip.NewReader(bytes.NewBuffer(data))
    if err != nil {
        return
    }
    defer zr.Close()
    fmt.Println(zr)
    dec := json.NewDecoder(zr)
    err = dec.Decode(&d)

    return
}

func function(request events.KinesisEvent) error {
    for _, record := range request.Records {
        fmt.Println(record.EventName)
        fmt.Println(string(record.Kinesis.Data))

        err = Parse(record.Kinesis.Data, logData)
        fmt.Println(err)
        fmt.Println(logData)
    }
    return nil
}

func main() {
    lambda.Start(function)
}

他们两个我都得到同样的错误:

illegal base64 data at input byte 0

根据我的理解,在 Base64 中接收并压缩的日志格式,但我无法在网上找到任何专门针对 Go 的内容。

编辑:

添加了日志数据类型

// CloudwatchLogsData is an unmarshal'd, ungzip'd, cloudwatch logs event
type CloudwatchLogsData struct {
    Owner               string                   `json:"owner"`
    LogGroup            string                   `json:"logGroup"`
    LogStream           string                   `json:"logStream"`
    SubscriptionFilters []string                 `json:"subscriptionFilters"`
    MessageType         string                   `json:"messageType"`
    LogEvents           []CloudwatchLogsLogEvent `json:"logEvents"`
}

Base64解码解压后的数据格式为JSON,结构如下:(根据AWS:https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html

{
    "owner": "111111111111",
    "logGroup": "logGroup_name",
    "logStream": "111111111111_logGroup_name_us-east-1",
    "subscriptionFilters": [
        "Destination"
    ],
    "messageType": "DATA_MESSAGE",
    "logEvents": [
        {
            "id": "31953106606966983378809025079804211143289615424298221568",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221569",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        },
        {
            "id": "31953106606966983378809025079804211143289615424298221570",
            "timestamp": 1432826855000,
            "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
        }
    ]
}

好的,事实证明我不必从 base64 解码,而只是简单地解压缩数据



func Unzip(data []byte) error {
    rdata := bytes.NewReader(data)
    r, err := gzip.NewReader(rdata)
    if err != nil {
        return err
    }
    uncompressedData, err := ioutil.ReadAll(r)
    if err != nil {
        return err
    }
    fmt.Println(string(uncompressedData))
    return nil
}

uncompressedData是cloudwatch日志的JSON字符串