使用 logstash-grok、clone 和 mutate 在两个事件中拆分日志条目
Split a log entry in two events with logstash-grok, clone and mutate
我试图在我的 logstash 配置中使用 grok, mutate and clone 将日志拆分为两个事件。
我的堆栈是非常标准的ELK(Elasticsearch、Logstash、Kibana)。
我正在创建格式如下的日志:
timestamp float integer
例如:
2015/01/19 21:48:12 24.7 32
2015/01/19 22:00:20 24.7 32
2015/01/19 22:01:11 24.7 32
2015/01/19 22:01:58 24.7 28
2015/01/19 22:02:28 23.7 28
(etc ...)
最终,我想要在 logstash 中有两个事件,一个显然带有键 timestamp
,type=sensorA
和 value=%{the value of the integer}
,另一个具有相同的时间戳和 value=%{value of the float}
和 type=sensorB
.
到目前为止,我在 logstash.conf
:
中使用了这个配置
1.My 日志在我的输入中被标记为 type=sensor
:
input {
file {
path => "/var/log/sensors.log"
type => "sensor"
}
}
2.Then,我用grok,clone,mutate尝试拆分
if [type] == "sensor" {
# Extracts the values
grok {
match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
}
mutate {
update => [ "type", "sensorA" ]
}
# Clones the event
clone {
clones => ["sensorB"]
}
}
# So now I should have two events,
# one with type sensorA and one with type sensorB, no ? :
if [type] == "sensorA" {
mutate {
add_field => { "value" => "%{sensorA}" }
convert => ["value", "float"]
}
}
if [type] == "sensorB" {
mutate {
add_field => { "value" => "%{sensorB}" }
convert => ["value", "integer"]
}
}
但这并没有真正起作用,因为尽管我得到了两个不同类型的事件,但它们都具有相同的值(始终是 sensorB 的值)。
怎么会?我感觉 logstash.conf 文件并没有真正以线性方式读取,但我找不到任何解决方案。
有什么提示吗?我在这里错过了一些非常明显的东西吗?
非常感谢
您可以使用 ruby
插件来完成您需要的操作,而不是使用 mutate 将值转换为 value
字段。
if [type] == "sensor" {
# Extracts the values
grok {
match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
}
mutate {
update => [ "type", "sensorA" ]
}
# Clones the event
clone {
clones => ["sensorB"]
}
}
# So now I should have two events,
# one with type sensorA and one with type sensorB, no ? :
ruby {
code => "
if event['type'] == 'sensorA'
event['value'] = event['sensorA']
elsif event['type'] == 'sensorB'
event['value'] = event['sensorB']
end
"
}
有了这个配置,我可以满足你的要求。
希望这可以帮助你:)
我试图在我的 logstash 配置中使用 grok, mutate and clone 将日志拆分为两个事件。 我的堆栈是非常标准的ELK(Elasticsearch、Logstash、Kibana)。
我正在创建格式如下的日志:
timestamp float integer
例如:
2015/01/19 21:48:12 24.7 32
2015/01/19 22:00:20 24.7 32
2015/01/19 22:01:11 24.7 32
2015/01/19 22:01:58 24.7 28
2015/01/19 22:02:28 23.7 28
(etc ...)
最终,我想要在 logstash 中有两个事件,一个显然带有键 timestamp
,type=sensorA
和 value=%{the value of the integer}
,另一个具有相同的时间戳和 value=%{value of the float}
和 type=sensorB
.
到目前为止,我在 logstash.conf
:
1.My 日志在我的输入中被标记为 type=sensor
:
input {
file {
path => "/var/log/sensors.log"
type => "sensor"
}
}
2.Then,我用grok,clone,mutate尝试拆分
if [type] == "sensor" {
# Extracts the values
grok {
match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
}
mutate {
update => [ "type", "sensorA" ]
}
# Clones the event
clone {
clones => ["sensorB"]
}
}
# So now I should have two events,
# one with type sensorA and one with type sensorB, no ? :
if [type] == "sensorA" {
mutate {
add_field => { "value" => "%{sensorA}" }
convert => ["value", "float"]
}
}
if [type] == "sensorB" {
mutate {
add_field => { "value" => "%{sensorB}" }
convert => ["value", "integer"]
}
}
但这并没有真正起作用,因为尽管我得到了两个不同类型的事件,但它们都具有相同的值(始终是 sensorB 的值)。
怎么会?我感觉 logstash.conf 文件并没有真正以线性方式读取,但我找不到任何解决方案。
有什么提示吗?我在这里错过了一些非常明显的东西吗? 非常感谢
您可以使用 ruby
插件来完成您需要的操作,而不是使用 mutate 将值转换为 value
字段。
if [type] == "sensor" {
# Extracts the values
grok {
match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
}
mutate {
update => [ "type", "sensorA" ]
}
# Clones the event
clone {
clones => ["sensorB"]
}
}
# So now I should have two events,
# one with type sensorA and one with type sensorB, no ? :
ruby {
code => "
if event['type'] == 'sensorA'
event['value'] = event['sensorA']
elsif event['type'] == 'sensorB'
event['value'] = event['sensorB']
end
"
}
有了这个配置,我可以满足你的要求。 希望这可以帮助你:)