Logstash 聚合 return 空消息
Logstash aggregation return empty message
我有一个测试环境,可以在投入生产之前测试一些 logstash 插件。
目前,我正在使用 kiwi 系统日志生成器来生成一些系统日志以供测试。
我的字段如下:
@timestamp
message
+ elastic medatadata
从这个基本字段开始,我开始过滤我的数据。
首先是在 timestamp and message
的基础上添加一个新字段,如下所示:
input {
syslog {
port => 514
}
}
filter {
prune {
whitelist_names =>["timestamp","message","newfield", "message_count"]
}
mutate {
add_field => {"newfield" => "%{@timestamp}%{message}"}
}
}
prune
只是为了不处理不需要的数据。
这很好用,因为我得到了一个包含这 2 个值的新字段。
下一步是 运行 根据消息的特定内容进行一些聚合,例如消息是否包含 logged in
或 logged out
为此,我使用了聚合过滤器
grok {
match => {
"message" => [
"(?<[@metadata][event_type]>logged out)",
"(?<[@metadata][event_type]>logged in)",
"(?<[@metadata][event_type]>workstation locked)"
]
}
}
aggregate {
task_id => "%{message}"
code => "
map['message_count'] ||= 0; map['message_count'] += 1;
"
push_map_as_event_on_timeout => true
timeout_timestamp_field => "@timestamp"
timeout => 60
inactivity_timeout => 50
timeout_tags => ['_aggregatetimeout']
}
}
这按预期工作,但我在这里遇到问题。当聚合超时。为特定聚合填充的唯一字段是 message_count
如上图所示,newfield
和message(一共剩下一个,抱歉,截图放不下)都是空的。
出于演示和测试目的,这绝对没问题,但如果我每秒收到数百条系统日志而不知道 message_count
所指的消息,那将是难以管理的。
拜托,我在这里苦苦挣扎,我不知道如何解决这个问题,请有人帮助我理解如何用它所指的消息内容填充 newfield
?
这是我的整个 logstash 配置,使它更容易。
input {
syslog {
port => 514
}
}
filter {
prune {
whitelist_names =>["timestamp","message","newfield", "message_count"]
}
mutate {
add_field => {"newfield" => "%{@timestamp}%{message}"}
}
grok {
match => {
"message" => [
"(?<[@metadata][event_type]>logged out)",
"(?<[@metadata][event_type]>logged in)",
"(?<[@metadata][event_type]>workstation locked)"
]
}
}
aggregate {
task_id => "%{message}"
code => "
map['message_count'] ||= 0; map['message_count'] += 1;
"
push_map_as_event_on_timeout => true
timeout_timestamp_field => "@timestamp"
timeout => 60
inactivity_timeout => 50
timeout_tags => ['_aggregatetimeout']
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash_index"
}
stdout {
codec => rubydebug
}
csv {
path => "C:\Users\adminuser\Desktop\syslog\syslogs-%{+yyyy.MM.dd}.csv"
fields => ["timestamp", "message", "message_count", "newfield"]
}
}
push_map_as_event_on_timeout => true
当您使用它并发生超时时,它会使用地图的内容创建一个新事件。如果您希望原始消息中的字段出现在新事件中,则必须将它们添加到地图中。对于 task_id 有一个 shorthand 符号可以使用过滤器上的 timeout_task_id_field 选项来执行此操作,否则您已明确添加它们
map['newfield'] ||= event.get('newfield');
我有一个测试环境,可以在投入生产之前测试一些 logstash 插件。
目前,我正在使用 kiwi 系统日志生成器来生成一些系统日志以供测试。
我的字段如下:
@timestamp
message
+ elastic medatadata
从这个基本字段开始,我开始过滤我的数据。
首先是在 timestamp and message
的基础上添加一个新字段,如下所示:
input {
syslog {
port => 514
}
}
filter {
prune {
whitelist_names =>["timestamp","message","newfield", "message_count"]
}
mutate {
add_field => {"newfield" => "%{@timestamp}%{message}"}
}
}
prune
只是为了不处理不需要的数据。
这很好用,因为我得到了一个包含这 2 个值的新字段。
下一步是 运行 根据消息的特定内容进行一些聚合,例如消息是否包含 logged in
或 logged out
为此,我使用了聚合过滤器
grok {
match => {
"message" => [
"(?<[@metadata][event_type]>logged out)",
"(?<[@metadata][event_type]>logged in)",
"(?<[@metadata][event_type]>workstation locked)"
]
}
}
aggregate {
task_id => "%{message}"
code => "
map['message_count'] ||= 0; map['message_count'] += 1;
"
push_map_as_event_on_timeout => true
timeout_timestamp_field => "@timestamp"
timeout => 60
inactivity_timeout => 50
timeout_tags => ['_aggregatetimeout']
}
}
这按预期工作,但我在这里遇到问题。当聚合超时。为特定聚合填充的唯一字段是 message_count
如上图所示,newfield
和message(一共剩下一个,抱歉,截图放不下)都是空的。
出于演示和测试目的,这绝对没问题,但如果我每秒收到数百条系统日志而不知道 message_count
所指的消息,那将是难以管理的。
拜托,我在这里苦苦挣扎,我不知道如何解决这个问题,请有人帮助我理解如何用它所指的消息内容填充 newfield
?
这是我的整个 logstash 配置,使它更容易。
input {
syslog {
port => 514
}
}
filter {
prune {
whitelist_names =>["timestamp","message","newfield", "message_count"]
}
mutate {
add_field => {"newfield" => "%{@timestamp}%{message}"}
}
grok {
match => {
"message" => [
"(?<[@metadata][event_type]>logged out)",
"(?<[@metadata][event_type]>logged in)",
"(?<[@metadata][event_type]>workstation locked)"
]
}
}
aggregate {
task_id => "%{message}"
code => "
map['message_count'] ||= 0; map['message_count'] += 1;
"
push_map_as_event_on_timeout => true
timeout_timestamp_field => "@timestamp"
timeout => 60
inactivity_timeout => 50
timeout_tags => ['_aggregatetimeout']
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash_index"
}
stdout {
codec => rubydebug
}
csv {
path => "C:\Users\adminuser\Desktop\syslog\syslogs-%{+yyyy.MM.dd}.csv"
fields => ["timestamp", "message", "message_count", "newfield"]
}
}
push_map_as_event_on_timeout => true
当您使用它并发生超时时,它会使用地图的内容创建一个新事件。如果您希望原始消息中的字段出现在新事件中,则必须将它们添加到地图中。对于 task_id 有一个 shorthand 符号可以使用过滤器上的 timeout_task_id_field 选项来执行此操作,否则您已明确添加它们
map['newfield'] ||= event.get('newfield');