Logstash 聚合 return 空消息

Logstash aggregation return empty message

我有一个测试环境,可以在投入生产之前测试一些 logstash 插件。

目前,我正在使用 kiwi 系统日志生成器来生成一些系统日志以供测试。

我的字段如下:

@timestamp
message
+ elastic medatadata

从这个基本字段开始,我开始过滤我的数据。 首先是在 timestamp and message 的基础上添加一个新字段,如下所示:

input {
  syslog {
    port => 514
 }
}
filter {
  prune {
    whitelist_names =>["timestamp","message","newfield", "message_count"]
  }
  mutate {
        add_field => {"newfield" => "%{@timestamp}%{message}"}
  }
}

prune只是为了不处理不需要的数据。

这很好用,因为我得到了一个包含这 2 个值的新字段。

下一步是 运行 根据消息的特定内容进行一些聚合,例如消息是否包含 logged inlogged out

为此,我使用了聚合过滤器

  grok {
    match => {
        "message" => [
        "(?<[@metadata][event_type]>logged out)",
            "(?<[@metadata][event_type]>logged in)",
            "(?<[@metadata][event_type]>workstation locked)"

   ]

 }
}
   aggregate {
        task_id => "%{message}"
        code => "
        map['message_count'] ||= 0; map['message_count'] += 1;
        "
        push_map_as_event_on_timeout => true
        timeout_timestamp_field => "@timestamp"
        timeout => 60
        inactivity_timeout => 50
        timeout_tags => ['_aggregatetimeout']
           
        
      }
    
}

这按预期工作,但我在这里遇到问题。当聚合超时。为特定聚合填充的唯一字段是 message_count

如上图所示,newfield和message(一共剩下一个,抱歉,截图放不下)都是空的。

出于演示和测试目的,这绝对没问题,但如果我每秒收到数百条系统日志而不知道 message_count 所指的消息,那将是难以管理的。

拜托,我在这里苦苦挣扎,我不知道如何解决这个问题,请有人帮助我理解如何用它所指的消息内容填充 newfield

这是我的整个 logstash 配置,使它更容易。

input {
  syslog {
    port => 514
 }
}
filter {
  prune {
    whitelist_names =>["timestamp","message","newfield", "message_count"]
  }
  mutate {
        add_field => {"newfield" => "%{@timestamp}%{message}"}
  }
  grok {
    match => {
        "message" => [
        "(?<[@metadata][event_type]>logged out)",
            "(?<[@metadata][event_type]>logged in)",
            "(?<[@metadata][event_type]>workstation locked)"

   ]

 }
}
   aggregate {
        task_id => "%{message}"
        code => "
        map['message_count'] ||= 0; map['message_count'] += 1;
        "
        push_map_as_event_on_timeout => true
        timeout_timestamp_field => "@timestamp"
        timeout => 60
        inactivity_timeout => 50
        timeout_tags => ['_aggregatetimeout']
           
        
      }
    
}
output {
  elasticsearch {
     hosts => ["localhost:9200"]
         index => "logstash_index"
 }
  stdout {
    codec => rubydebug
 }
  csv {
    path => "C:\Users\adminuser\Desktop\syslog\syslogs-%{+yyyy.MM.dd}.csv"
    fields => ["timestamp", "message", "message_count", "newfield"]
 }
}
push_map_as_event_on_timeout => true

当您使用它并发生超时时,它会使用地图的内容创建一个新事件。如果您希望原始消息中的字段出现在新事件中,则必须将它们添加到地图中。对于 task_id 有一个 shorthand 符号可以使用过滤器上的 timeout_task_id_field 选项来执行此操作,否则您已明确添加它们

map['newfield'] ||= event.get('newfield');