Logstash - 仅解析一个 JSON 事件

Logstash - parses only one JSON event

我正在使用 ELK 5.3.0。我正在尝试解析简单的 JSON 文档。它确实可以创建 key/values,但是它只在 Elasticsearch 中写入一个事件。它是随机的。有时是第一,有时是第二或第三。但总是一个事件。

Filesetup(创建于 Mac。每个 JSON 对象一行),三个事件:

{"timestamp":"2012-01-01 02:00:01", "severity":"ERROR", "messages":"Foo failed", "fieldone": "I am first entry... if the value of a field one", "fieldtwo": "ttthis if the value of a field two"} {"timestamp":"2013-01-01 02:04:02", "severity":"INFO", "messages":"Bar was successful", "fieldone": "I am second entry... if the value of a field one", "fieldtwo": "this if the value of a field two"} {"timestamp":"2017-01-01 02:10:12", "severity":"DEBUG", "messages":"Baz was notified", "fieldone": "I am third entry... if the value of a field one", "fieldtwo": "this if the value of a field two"}

Filebeat 设置:

- input_type: log
  paths: Downloads/elk/small/jsontest.log
  document_type: jsonindex

Logstash 设置:

filter {
  if [@metadata][type] == "jsonindex" {
        json {
            source => "message"
        }            
    }
  }

Logstash 输出(显示三个事件):

{
"severity" => "DEBUG",
"offset" => 544,
"@uuid" => "a316bb67-98e5-4551-8243-f8538023cfd9",
"input_type" => "log",
"source" => "/Users/xxx/Downloads/elk/small/jsontest.log",
"fieldone" => "this if the value of a field one",
"type" => "jsonindex",
"tags" => [
[0] "beats_input_codec_json_applied",
[1] "_dateparsefailure"
],
"fieldtwo" => "this if the value of a field two",
"@timestamp" => 2017-05-08T11:25:41.586Z,
"@version" => "1",
"beat" => {
"hostname" => "C700893",
"name" => "C700893",
"version" => "5.3.0"
},
"host" => "C700893",
"fingerprint" => "bcb57f445084cc0e474366bf892f6b4ab9162a4e",
"messages" => "Baz was notified",
"timestamp" => "2017-01-01 02:10:12"
}
{
"severity" => "INFO",
"offset" => 361,
"@uuid" => "6d4b4401-a440-4894-b0de-84c97fc4eaf5",
"input_type" => "log",
"source" => "/Users/xxx/Downloads/elk/small/jsontest.log",
"fieldone" => "this if the value of a field one",
"type" => "jsonindex",
"tags" => [
[0] "beats_input_codec_json_applied",
[1] "_dateparsefailure"
],
"fieldtwo" => "this if the value of a field two",
"@timestamp" => 2017-05-08T11:25:41.586Z,
"@version" => "1",
"beat" => {
"hostname" => "C700893",
"name" => "C700893",
"version" => "5.3.0"
},
"host" => "C700893",
"fingerprint" => "bcb57f445084cc0e474366bf892f6b4ab9162a4e",
"messages" => "Bar was successful",
"timestamp" => "2013-01-01 02:04:02"
}
{
"severity" => "ERROR",
"offset" => 177,
"@uuid" => "d9bd0a0b-0021-48fd-8d9e-d6f82cd1e506",
"input_type" => "log",
"source" => "/Users/xxx/Downloads/elk/small/jsontest.log",
"fieldone" => "this if the value of a field one",
"type" => "jsonindex",
"tags" => [
[0] "beats_input_codec_json_applied",
[1] "_dateparsefailure"
],
"fieldtwo" => "this if the value of a field two",
"@timestamp" => 2017-05-08T11:25:41.586Z,
"@version" => "1",
"beat" => {
"hostname" => "C700893",
"name" => "C700893",
"version" => "5.3.0"
},
"host" => "C700893",
"fingerprint" => "bcb57f445084cc0e474366bf892f6b4ab9162a4e",
"messages" => "Foo failed",
"timestamp" => "2012-01-01 02:00:01"
}

ElasticSearch(文档在 JSON 中查看):

"tags": [
      "beats_input_codec_json_applied",
      "_dateparsefailure"
    ],

没有JSON失败。 _dateparsefailure 是预期的。

这里发生了什么?

编辑(解决方案): 过了一段时间,我想我是在开枪打自己的腿。由于我正在解析许多不同的日志和日志类型,我需要确保我没有重复,在我的 Logstash 输出部分我有这段代码来确保没有重复的日志:

uuid {
        target => "@uuid"
        overwrite => true
    }

    fingerprint {
        source => ["message"]
        target => "fingerprint"
        key => "78787878"
        method => "SHA1"
        concatenate_sources => true
    }
}

同样在我调用 ElasticSearch 的同一部分结束:

if [@metadata][type] == "jsonindex" {
        elasticsearch {
            hosts => [ "localhost:9200" ]
            index => "%{[@metadata][type]}"
            document_id => "%{fingerprint}"
        }
    }

由于我的 JSON 对象不包含消息 属性,它实际上总是相同的:

fingerprint {
            source => ["message"]

...

对索引创建的小修改修复了问题:

if [@metadata][type] == "jsonindex" {
            elasticsearch {
                hosts => [ "localhost:9200" ]
                index => "%{[@metadata][type]}"                
            }
        }

your data need to separate by lines

grok需要解析成不同的行,会变成3条数据

例如:

{"timestamp":"2012-01-01 02:00:01", "severity":"ERROR", "messages":"Foo failed", "fieldone": "I am first entry... if the value of a field one", "fieldtwo": "ttthis if the value of a field two"} 
{"timestamp":"2013-01-01 02:04:02", "severity":"INFO", "messages":"Bar was successful", "fieldone": "I am second entry... if the value of a field one", "fieldtwo": "this if the value of a field two"} 
{"timestamp":"2017-01-01 02:10:12", "severity":"DEBUG", "messages":"Baz was notified", "fieldone": "I am third entry... if the value of a field one", "fieldtwo": "this if the value of a field two"}

你在一行所以结果是解析最后一个,这意味着时间戳是最后一个

"timestamp":"2017-01-01 02:10:12

如果你按行更改,我想可能不会,但你可以使用这个

- input_type: log
  paths: Downloads/elk/small/jsontest.log
  document_type: jsonindex
  multiline.pattern: '^{"timestamp":"[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}, '
  multiline.negate: true
  multiline.match: after

添加多行更改,但恐怕您的数据没有按行分隔