Logstash:拆分后基于 If 的事件丢弃

Logstash: If-based Drop On Event After Split

情况:

我的输入日志看起来像这样:

{
"key1":"value1"
"key2":"value2"
"events":
[{
"Level":"Information",
"Code":"100"
},
{
"Level":"Information",
"SomeKey":"SomeValue"
},
{
"Level":"Error",
"Message":"Something went wrong"
}
]}

我想:

  1. 拆分事件数组以创建仍填充所有外层字段(key1 和 key2)的单个对象。

  2. 在拆分后有选择地删除日志。我只想保留包含 "Code" 属性.

  3. 的 "Information-level" 日志

我的 logstash 配置看起来像

filter {
 split {
  field => "[events]"
  }
}

 filter {
    if ![events][Code]
    { drop {} }
 }

 output {
   elasticsearch {}
 }

问题:

Logstash 在执行第二个过滤器之前似乎没有分离事件。

换句话说,如果日志中的任何事件没有 "Code" 字段,整个日志将被删除,包括我必须保留的 "Error" 级信息。

我已经为此忙了一整天了,这真的让我很紧张。我会手动尝试创建自己的插件,但我从未使用过任何 Ruby。

我很确定这无关紧要,但我是 运行 Docker 中的 ELK 堆栈。我相信配置文件会正确加载,并且它们会被 Logstash 使用。

这实际上似乎是某种 logstash 错误。这是一个错误的演示:

配置文件:

input {
    stdin { codec => "json" }
}
filter {
    split { field => "events" }
    if ([events] == "" or [events][Code] == "") { 
        drop {} 
    }
}  
output {
    stdout { codec => "rubydebug" }
}

命令行:

echo '{"key1":"value1","key2":"value2","events":[{"Level":"Information","Code":"100"},{"Level":"Information","SomeKey":"SomeValue"},{"Level":"Error","Message":"Something went wrong"}]}' | bin/logstash -f test.conf

结果:

Exception in thread "[main]>worker7" java.lang.NumberFormatException: For input string: "Code"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.logstash.Accessors.fetch(Accessors.java:130)
    at org.logstash.Accessors.get(Accessors.java:20)
    at org.logstash.Event.getUnconvertedField(Event.java:160)
    at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_get_field(JrubyEventExtLibrary.java:113)
    at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i[=12=]$ruby_get_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i[=12=]$ruby_get_field.gen)

所以基本上它仍然认为 events 是一个数组,它试图通过 "Code"

对其进行索引

这似乎可以解决问题:

    mutate {
            add_field => ["code", "%{[events][Code]}"]
    }
    if ([code] == "%{[events][Code]}") {
            drop {}
    }
    mutate {
            remove_field => ["code"]
    }