使用 logstash-grok、clone 和 mutate 在两个事件中拆分日志条目

Split a log entry in two events with logstash-grok, clone and mutate

我试图在我的 logstash 配置中使用 grok, mutate and clone 将日志拆分为两个事件。 我的堆栈是非常标准的ELK(Elasticsearch、Logstash、Kibana)。

我正在创建格式如下的日志:

timestamp float integer

例如:

2015/01/19 21:48:12 24.7 32
2015/01/19 22:00:20 24.7 32
2015/01/19 22:01:11 24.7 32
2015/01/19 22:01:58 24.7 28
2015/01/19 22:02:28 23.7 28
(etc ...)

最终,我想要在 logstash 中有两个事件,一个显然带有键 timestamptype=sensorAvalue=%{the value of the integer},另一个具有相同的时间戳和 value=%{value of the float}type=sensorB.

到目前为止,我在 logstash.conf:

中使用了这个配置

1.My 日志在我的输入中被标记为 type=sensor :

  input {
    file {
      path => "/var/log/sensors.log"
      type => "sensor"
    }
  }

2.Then,我用grok,clone,mutate尝试拆分

if [type] == "sensor" {
  # Extracts the values
  grok { 
    match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
  }
  mutate {
    update => [ "type", "sensorA" ]
  }
  # Clones the event
  clone {
    clones => ["sensorB"]
  }
}
# So now I should have two events, 
# one with type sensorA and one with type sensorB, no ? :
if [type] == "sensorA" {
  mutate {
    add_field => { "value" => "%{sensorA}" }
    convert => ["value", "float"]
  }
}
if [type] == "sensorB" {
  mutate {
    add_field => { "value" => "%{sensorB}" }
    convert => ["value", "integer"]
  }
}

但这并没有真正起作用,因为尽管我得到了两个不同类型的事件,但它们都具有相同的值(始终是 sensorB 的值)。

怎么会?我感觉 logstash.conf 文件并没有真正以线性方式读取,但我找不到任何解决方案。

有什么提示吗?我在这里错过了一些非常明显的东西吗? 非常感谢

您可以使用 ruby 插件来完成您需要的操作,而不是使用 mutate 将值转换为 value 字段。

if [type] == "sensor" {
    # Extracts the values
    grok {
        match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
    }
    mutate {
        update => [ "type", "sensorA" ]
    }
    # Clones the event
    clone {
        clones => ["sensorB"]
    }
}
# So now I should have two events,
# one with type sensorA and one with type sensorB, no ? :
ruby {
    code => "
        if event['type'] == 'sensorA'
                event['value'] = event['sensorA']
        elsif event['type'] == 'sensorB'
                event['value'] = event['sensorB']
        end
    "
}

有了这个配置,我可以满足你的要求。 希望这可以帮助你:)