以 ns、us、ms 或 s 为单位的自定义响应时间的 Grok 模式 - logstash

Grok pattern for custom response time in ns, us, ms or s - logstash

我需要从自定义日志格式解析响应时间,以便它可以通过 Logstash 管道传输到弹性。

示例日志条目为:

2018-11-19 23:40:00-0500 avg:30.5ms max:135ms min:6.61ms reqs:20 rsps:20 errs:0 maxcon:3 99th:135ms 95th:134ms 90th:111ms 75th:22.6ms 50th:15.6ms heap:36.7% load:1.43/0.75/0.60 cpu:26.3%

Avg、max、min 可以是 ns、us、ms 或 s 格式。 我开始于:

%{TIMESTAMP_ISO8601:timestamp} avg:%{NUMBER:avg}ms

当然它不适用于 ns 等,所以我需要这样的东西:

%{TIMESTAMP_ISO8601:timestamp} avg:%{NUMBER:avg}(ns|us|ms|s)

但是我会丢失信息,因为我必须按数值比例来表示 ms。所以 ns 乘以 1e6, ms -> 1e3, ms -> 1, s -> 1e-3.

解决该问题的最佳方法是什么?

好的,所以我终于找到了解决方案。 首先对 grok 模式进行一些更改,如下所示:

%{TIMESTAMP_ISO8601:timestamp} avg:%{NUMBER:avg:float}(?<avgUnit>[unm]?s)

这给了我们事件中的两个元素'avg'和'avgUnit'这样的值可以传递给ruby 由插件执行的脚本: 脚本读取流程:

# filter runs for every event
# # return the list of events to be passed forward
# # returning empty list is equivalent to event.cancel
def filter(event)
  #convert operates on event
  convert(event ,"maxUnit", "max")
  convert(event, "minUnit", "min")
  convert(event, "avgUnit", "avg")
  convert(event, "99thUnit", "99th")
  return [event]
end

def convert(event, unitField, valueField)
  if event.get(valueField).nil?
    event.tag("__#{valueField}__not_found")
    return [event]
  end

  if event.get(unitField).nil?
    event.tag("__#{unitField}_not_found")
    return [event]
  end

  unit = event.get(unitField)
  value = event.get(valueField)
  fieldName = "#{valueField}InMs"
  case unit
  when "ns"
    event.set(fieldName, value / 1.0e6)
  when "us"
    event.set(fieldName, value / 1.0e3)
  when "ms"
    event.set(fieldName, value)
  when "s"
    event.set(fieldName, value * 1.0e3)
  else
    event.tag("__not_supported_unit_#{unit}")
  end
  return [event]
end

管道必须配置以在匹配后包含脚本:

grok {
  match => {
    "message" => ["%{TIMESTAMP_ISO8601:tstamp} avg:%{NUMBER:avg:float}(?<avgUnit>[unm]?s)]
  }
}
ruby {
  path => "script.rb"
}