无法从 Kinesis 流解码 CloudWatch Logs
Unable to decode CloudWatch Logs from Kinesis stream
我试图通过 Kinesis“汇集”我的 cloudwatch 日志,然后到 lambda 进行处理,但是我找不到 decode/parse 传入日志的方法。
到目前为止我试过这个:
方法一使用cloudwatch“class”
func function(request events.KinesisEvent) error {
for _, record := range request.Records {
fmt.Println(record.EventName)
fmt.Println(string(record.Kinesis.Data))
rawData := events.CloudwatchLogsRawData{
Data: string(record.Kinesis.Data),
}
parse, err := rawData.Parse()
fmt.Println(parse)
fmt.Println(err)
}
return nil
}
func main() {
lambda.Start(function)
}
方法二手动解码
var logData events.CloudwatchLogsData
func Base64Decode(message []byte) (b []byte, err error) {
var l int
b = make([]byte, base64.StdEncoding.DecodedLen(len(message)))
l, err = base64.StdEncoding.Decode(b, message)
if err != nil {
return
}
return b[:l], nil
}
func Parse(rawData []byte, d events.CloudwatchLogsData) (err error) {
data, err := Base64Decode(rawData)
if err != nil {
return
}
zr, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return
}
defer zr.Close()
fmt.Println(zr)
dec := json.NewDecoder(zr)
err = dec.Decode(&d)
return
}
func function(request events.KinesisEvent) error {
for _, record := range request.Records {
fmt.Println(record.EventName)
fmt.Println(string(record.Kinesis.Data))
err = Parse(record.Kinesis.Data, logData)
fmt.Println(err)
fmt.Println(logData)
}
return nil
}
func main() {
lambda.Start(function)
}
他们两个我都得到同样的错误:
illegal base64 data at input byte 0
根据我的理解,在 Base64 中接收并压缩的日志格式,但我无法在网上找到任何专门针对 Go 的内容。
编辑:
添加了日志数据类型
// CloudwatchLogsData is an unmarshal'd, ungzip'd, cloudwatch logs event
type CloudwatchLogsData struct {
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
MessageType string `json:"messageType"`
LogEvents []CloudwatchLogsLogEvent `json:"logEvents"`
}
Base64解码解压后的数据格式为JSON,结构如下:(根据AWS:https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html)
{
"owner": "111111111111",
"logGroup": "logGroup_name",
"logStream": "111111111111_logGroup_name_us-east-1",
"subscriptionFilters": [
"Destination"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
}
]
}
好的,事实证明我不必从 base64 解码,而只是简单地解压缩数据
func Unzip(data []byte) error {
rdata := bytes.NewReader(data)
r, err := gzip.NewReader(rdata)
if err != nil {
return err
}
uncompressedData, err := ioutil.ReadAll(r)
if err != nil {
return err
}
fmt.Println(string(uncompressedData))
return nil
}
uncompressedData
是cloudwatch日志的JSON字符串
我试图通过 Kinesis“汇集”我的 cloudwatch 日志,然后到 lambda 进行处理,但是我找不到 decode/parse 传入日志的方法。 到目前为止我试过这个:
方法一使用cloudwatch“class”
func function(request events.KinesisEvent) error {
for _, record := range request.Records {
fmt.Println(record.EventName)
fmt.Println(string(record.Kinesis.Data))
rawData := events.CloudwatchLogsRawData{
Data: string(record.Kinesis.Data),
}
parse, err := rawData.Parse()
fmt.Println(parse)
fmt.Println(err)
}
return nil
}
func main() {
lambda.Start(function)
}
方法二手动解码
var logData events.CloudwatchLogsData
func Base64Decode(message []byte) (b []byte, err error) {
var l int
b = make([]byte, base64.StdEncoding.DecodedLen(len(message)))
l, err = base64.StdEncoding.Decode(b, message)
if err != nil {
return
}
return b[:l], nil
}
func Parse(rawData []byte, d events.CloudwatchLogsData) (err error) {
data, err := Base64Decode(rawData)
if err != nil {
return
}
zr, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return
}
defer zr.Close()
fmt.Println(zr)
dec := json.NewDecoder(zr)
err = dec.Decode(&d)
return
}
func function(request events.KinesisEvent) error {
for _, record := range request.Records {
fmt.Println(record.EventName)
fmt.Println(string(record.Kinesis.Data))
err = Parse(record.Kinesis.Data, logData)
fmt.Println(err)
fmt.Println(logData)
}
return nil
}
func main() {
lambda.Start(function)
}
他们两个我都得到同样的错误:
illegal base64 data at input byte 0
根据我的理解,在 Base64 中接收并压缩的日志格式,但我无法在网上找到任何专门针对 Go 的内容。
编辑:
添加了日志数据类型
// CloudwatchLogsData is an unmarshal'd, ungzip'd, cloudwatch logs event
type CloudwatchLogsData struct {
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
MessageType string `json:"messageType"`
LogEvents []CloudwatchLogsLogEvent `json:"logEvents"`
}
Base64解码解压后的数据格式为JSON,结构如下:(根据AWS:https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html)
{
"owner": "111111111111",
"logGroup": "logGroup_name",
"logStream": "111111111111_logGroup_name_us-east-1",
"subscriptionFilters": [
"Destination"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
}
]
}
好的,事实证明我不必从 base64 解码,而只是简单地解压缩数据
func Unzip(data []byte) error {
rdata := bytes.NewReader(data)
r, err := gzip.NewReader(rdata)
if err != nil {
return err
}
uncompressedData, err := ioutil.ReadAll(r)
if err != nil {
return err
}
fmt.Println(string(uncompressedData))
return nil
}
uncompressedData
是cloudwatch日志的JSON字符串