聚合日志将第一个时间戳设置为@timestamp

Aggregated logs set first timestamp as @timestamp

我正在为“389 目录服务器”日志开发过滤器,这些日志显示用户在服务器上的操作、连接、搜索、添加、修改等的历史记录... 我正在使用聚合过滤器将所有这些日志行合并为一个事件。

但是,我希望事件的最终@timestamp(用户断开连接后)是第一个事件的@timestamp(首次建立连接时) 我试过使用日期过滤器,虽然它确实改变了每个事件(每个日志行)的@timestamp,但聚合过滤器生成的最终映射仍然使用处理日志的时间。

我可以将第一个@timestamp 保存到地图中的另一个字段,但如何用该字段替换@timestamp?

由于过滤器很长,我将只包括开始和结束:

filter {
  grok {
    match => { "message" => [
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} fd=%{NUMBER:file_descriptor} slot=%{NUMBER} %{WORD:connection_method} connection from %{IP:source} to %{IP:destination}$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} %{NOTSPACE:ssl_version} (?<encryption_method>%{NOTSPACE} %{NOTSPACE})$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} dn=%{QUOTEDSTRING:user_dn} method=%{NOTSPACE:bind_method} version=%{NUMBER:ldap_version}($)?(mech=%{NOTSPACE:auth_mechanism}$)?",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} err=%{NUMBER:error_code} tag=%{NUMBER:tag_number} nentries=%{NUMBER:number_of_entries} etime=%{NUMBER:operation_time}($)?(dn=%{QUOTEDSTRING}$)?",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation} base=%{QUOTEDSTRING:search_base} scope=%{NUMBER:search_scope} filter=%{QUOTEDSTRING:search_filter} attrs=%{QUOTEDSTRING:search_attributes}$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} %{WORD:ldap_operation}$",
      "^(\s)?\[%{HTTPDATE:timestamp}\] conn=%{NUMBER:connection_id} op=%{NUMBER:op_number} fd=%{NUMBER:file_descriptor} %{WORD:connection_result} - %{WORD:connection_code}$"
      ]
    }
  }
  if "" in [connection_method] {
    aggregate {
      task_id => "%{connection_id}"
      code => "
        map['timestamp'] = event['@timestamp']
        map['tags'] ||= ['aggregated']
        map['source'] = event['source']
        map['destination'] = event['destination']
        map['file_descriptor'] = event['file_descriptor']
        map['connection_method'] = event['connection_method']
      "
      map_action => "create"
    }
  }
  else if "" in [connection_code] {
    mutate {
      add_tag => [ "map_finished" ]
    }
    aggregate {
      task_id => "%{connection_id}"
      code => "
        map['operations'][event['op_number']]['connection_code'] = event['connection_code']
        map['operations'][event['op_number']]['connection_result'] = event['connection_result']
      "
      map_action => "update"
    }
  }
  else {
    aggregate {
      task_id => "%{connection_id}"
      code => "
        map['@timestamp'] = map['timestamp']
      "
      timeout => 0
      push_map_as_event_on_timeout => true
    }
  }
}

在过滤器中使用日期?
ISO8601 表示您的日期类型

date {
            # timezone => "America/Asuncion"
            match => ["timestamp", "ISO8601"]
            target => "@timestamp"
        }   

弄清楚了,我没有realize/understand的是,当我到达最终事件并推出地图时,它(地图)将被 logstash 作为一个新事件(如来自文件的新日志行)和 logstash 将尝试将该事件与其中一个过滤器匹配,并且失败,因为最终映射不包含将匹配任何过滤器的 massage 字段。

创建新过滤器解决了问题。

filter {
  if "aggregated" in [tags] {
    date {
      match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
      target => "@timestamp"
      add_tag => [ "tmatch" ]
    }
  }
}

您还可以使用以下代码行根据特定事件时间戳更新聚合事件时间戳。但是,如果您使用日志文件时间戳作为事件时间戳,请确保在聚合操作之前应用日期过滤器。

map['@timestamp'] ||= event.get('@timestamp');