Logstash - JSON 解析列表

Logstash - JSON parse list

我真的很喜欢 ELK 解析日志。但是,我陷入了需要解析字典列表的地步。以下是我的日志:-

IP - - 0.000 0.000 [24/May/2015:06:51:13 +0000] *"POST /c.gif HTTP/1.1"* 200 4 * user_id=UserID&package_name=SomePackageName&model=Titanium+S202&country_code=in&android_id=AndroidID&eT=1432450271859&eTz=GMT%2B05%3A30&events=%5B%7B%22eV%22%3A%22com.olx.southasia%22%2C%22eC%22%3A%22appUpdate%22%2C%22eA%22%3A%22app_activated%22%2C%22eTz%22%3A%22GMT%2B05%3A30%22%2C%22eT%22%3A%221432386324909%22%2C%22eL%22%3A%22packageName%22%7D%5D * "-" "-" "-"

上述日志的URL解码版本为

IP - - 0.000 0.000 [24/May/2015:06:51:13  0000] *"POST /c.gif HTTP/1.1"* 200 4 * user_id=UserID&package_name=SomePackageName&model=Titanium S202&country_code=in&android_id=AndroidID&eT=1432450271859&eTz=GMT+05:30&events=[{"eV":"com.olx.southasia","eC":"appUpdate","eA":"app_activated","eTz":"GMT+05:30","eT":"1432386324909","eL":"packageName"}] * "-" "-" "-"

无论我在哪里尝试解析它,它都会向我显示 _jsonparsefailure。我也经历过 问题,也经历过各种论坛,但没有找到完美的解决方案。我如何解析 logstash 中的 json 列表?如果到目前为止不存在,有什么解决办法。??

以下是我的配置文件。

filter {
    mutate {
        gsub => [
            "message", "\+", "%20"
        ]
    }

    urldecode{
        field => "message"
    }
    grok {
        match => [
            'message', '%{IP:clientip}%{GREEDYDATA} \[%{GREEDYDATA:timestamp}\] \*"%{WORD:method}%{GREEDYDATA}'
        ]
    }

    kv {
        field_split => "&?"
    }

    json{
        source => "events"
    }

    geoip {
        source => "clientip"
    }
}

此问题与 完全相同。即使有相同的日志条目?!谁能理解这一点?

你可以在那里看到我的答案,但我会为你总结一下...选项 e) 可能是最好的方法


显然,由于方括号,您得到了 jsonparsefailure。作为解决方法,您可以手动删除它们。在您的 kv 之后和 json 过滤器之前添加以下 mutate 过滤器:

mutate  {
    gsub => [ "events","\]",""]
    gsub => [ "events","\[",""]
}

但是,这不适用于 [{"foo":"bar"},{"foo":"bar1"}] 这样的输入。所以这里有 4 个选项:

选项 a) 丑陋的 gsub

一个丑陋的解决方法是另一个 gsub:

gsub => [ "event","\},\{",","]

但这会消除内部关系,所以我猜你不想那样做。

选项 b) 分割

更好的方法可能是使用拆分过滤器:

split {
    field => "event"
    terminator => ","
}
mutate  {
    gsub => [ "event","\]",""]
    gsub => [ "event","\[",""]
   }
json{
    source=> "event"
}

这会产生多个事件。 (第一个是 foo = bar,第二个是 foo1 = bar1。)

选项 c) 变异拆分

您可能希望将所有值都放在一个 logstash 事件中。您可以使用 mutate => split 过滤器生成数组并解析 json(如果存在条目)。不幸的是,您必须为每个条目设置一个条件,因为 logstash 在其配置中不支持循环。

mutate  {
    gsub => [ "event","\]",""]
    gsub => [ "event","\[",""]
    split => [ "event", "," ]
   }

json{
    source=> "event[0]"
    target => "result[0]"
}

if 'event[1]' {
    json{
        source=> "event[1]"
        target => "result[1]"
    }
    if 'event[2]' {
        json{
            source=> "event[2]"
            target => "result[2]"
        }
    }
    # You would have to specify more conditionals if you expect even more dictionaries
}

选项 d) Ruby1

以下工作(在您的 kv 过滤器之后): 而是使用选项 e)

mutate  {
    gsub => [ "event","\]",""]
    gsub => [ "event","\[",""]
}

ruby  {
    init => "require 'json'"
    code => "
        e = event['event'].split(',')
        ary = Array.new
        e.each do |x|
            hash = JSON.parse(x)
            hash.each do |key, value|
                ary.push( { key =>  value } )
            end
        end
        event['result'] = ary
    "
}

更新

选项 e) Ruby2

经过一些测试,这可能是最好的方法。在你的 kv 过滤器之后使用它:

ruby  {
    init => "require 'json'"
    code => "event['result'] = JSON.parse(event['event'])"
}