如何组合日期和 CSV 过滤器以获得正确的@timestamp 字段?

How do I combine a date and CSV filter to get the correct @timestamp field?

我有一个 txt 格式的日志文件,经过各种费力的尝试后我得出结论,使用 csv 过滤器是最简单和最干净的(我已经尝试过 grok 模式,但它很乱)。 我的日志文件中的一行看起来像这样....(5 个字段,第一个是日期时间) 18/MAR/2015:15:00:02, 接受, Apple-1, 60649, 10.31.5.8

我想要的是让 logstash 读取它,获取第一个字段,即日期时间,并将其分配给 @timestamp 字段,然后使用 CSV 过滤器继续解析其余 4 个字段。

我的代码粘贴在下面....

input {
    stdin{}
}

filter {

date {
        locale => "en"
        match => ["message", "dd/MMM/YYYY:HH:mm:ss,"]
        timezone => "Europe/London"
        target => "@timestamp"
       # add_field => { "debug" => "timestampmatched"}
   }

csv {
         columns => ["@timestamp", "Decision", "Service", "PortNumber", "SourceIP"]
        separator => ","
    }

}

output {
    elasticsearch {
       #action => "index"
        host => "localhost"
        }
    stdout {
        codec => rubydebug
     }

请注意,我单独测试了日期过滤器,以确保它确实有效。 问题是,当我 运行 并将日志行粘贴到提示符时(因为它是标准输入),我收到以下错误...

Using milestone 2 filter plugin 'csv'. This plugin should be stable, but if you see strange behavior, please let us know! For more information on plugin milestones, see http://logstash.net/docs/1.4.2/plugin-milestones {:level=>:warn}
18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8
Failed parsing date from field {:field=>"message", :value=>"18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8", :exception=>java.lang.IllegalArgumentException: Invalid format: "18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8" is malformed at " Accept, Apple-1, 60649, 10.31.5.8", :level=>:warn}
Trouble parsing csv {:source=>"message", :raw=>"18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8", :exception=>#<TypeError: The field '@timestamp' must be a Time, not a String (18/MAR/2015:15:00:02)>, :level=>:warn}
{
       "message" => [
        [0] "18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8"
    ],
      "@version" => "1",
    "@timestamp" => "2015-05-01T12:59:13.011Z",
          "host" => "UOD-220076",
          "tags" => [
        [0] "_csvparsefailure"
    ]
}

我不明白这个错误。因此,我决定用下面的代码尝试一些不同的东西....

input {

 stdin{}
}

filter {

date {
        locale => "en"
        match => ["message", "dd/MMM/YYYY:HH:mm:ss,"]
        timezone => "Europe/London"
        target => "@timestamp"
        add_field => { "debug" => "timestampmatched"}
   }

csv {
         columns => ["timestampmatched", "Decision", "Service", "PortNumber", "SourceIP"]
        separator => ","
    }

}

output {
    elasticsearch {
       #action => "index"
        host => "localhost"
        }
    stdout {
        codec => rubydebug
     }

此代码与第一个代码之间的唯一区别是添加了两个内容,一个名为 'timestampmatched' 的字段并且 csv 过滤器中的第一列为 'timestampmatched'(与 @timestamp 相反)。然后我得到以下结果...

UOD-220076:bin student$ logstash -f fwlogs1.conf
Using milestone 2 filter plugin 'csv'. This plugin should be stable, but if you see strange behavior, please let us know! For more information on plugin milestones, see http://logstash.net/docs/1.4.2/plugin-milestones {:level=>:warn}
18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8
Failed parsing date from field {:field=>"message", :value=>"18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8", :exception=>java.lang.IllegalArgumentException: Invalid format: "18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8" is malformed at " Accept, Apple-1, 60649, 10.31.5.8", :level=>:warn}
{
             "message" => [
        [0] "18/MAR/2015:15:00:02, Accept, Apple-1, 60649, 10.31.5.8"
    ],
            "@version" => "1",
          "@timestamp" => "2015-05-01T13:11:04.399Z",
                "host" => "UOD-220076",
    "timestampmatched" => "18/MAR/2015:15:00:02",
            "Decision" => " Accept",
             "Service" => " Apple-1",
          "PortNumber" => " 60649",
            "SourceIP" => " 10.31.5.8"
}

......这是我想要的,除了@timestamp字段应该是"timestampmatched"。

有什么想法吗?

您需要先执行 csv 过滤器,如下所示:

csv {
  columns => ["timestamp", "Decision", "Service", "PortNumber", "SourceIP"]
  separator => ","
}

接着是上面创建的 timestamp 字段上的日期过滤器

date {
    locale => "en"
    match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss"]
    timezone => "Europe/London"
}

这应该适合你。 Logstash 按顺序应用过滤器。