logstash拆分日志,单独插入到elasticsearch中

Logstash split log and insert it separately into elasticsearch

我正在编写一个 logstash 配置文件,但我收到的日志给我带来了问题,团队向我发送了多个合并为一个日志的日志,例如。

留言: [logitem(aaa=1, bbb=1, ccc=1), logitem(aaa=2, bbb=2, ccc=2), logitem(aaa=3, bbb=3, ccc=3)]

可不可以把这些log分成3份,分别插入到elasticsearch中? (3 条记录)

使用 ruby 过滤器

这种方式应该可行(请参阅下面的评论以进行讨论和参考)。您可能需要在几个地方调整 grok / scan 正则表达式。

grok {
    match => {
        "message" => "^\[%{GREEDYDATA:logitems}\]$"
    }
}

ruby {
    code => "event.set('logitem', event.get('message').scan(/logitem\([^\)]+\)/))"
}

split {
    field => "logitem"
}

grok {
    match => {
        "logitem" => "^logitem\(aaa=%{DATA:field_a}, bbb=%{DATA:field_b}, ccc=%{DATA:field_c}\)"
    }
}

扫描正则表达式的目的是匹配一个字符串:

  • logitem
  • 开头
  • 然后一个(字符
  • 然后是 )
  • 以外的任何字符中的一个或多个
  • )
  • 结尾

使用神交

令人惊讶的是,这种方式行不通。有关详细信息,请参阅 this github issue。 TL;DR ... grok 不会将重复的匹配项放入数组中。

filter {
  grok {
    match => {
      "message" => "^\[*(logitem\(%{DATA:logitem}\), )*logitem\(%{DATA:logitem}\)\]$"
    }
  }
  split {
    field => "logitem"
  }
}

如果您确定消息将始终采用 aaa=, bbb= 格式,您可以更明确。

[编辑 1:将 grok 方法标记为无效并添加了 ruby 方法。 2:为了更好的流程重新排序了一些东西]