根据日志中的字段组合请求和响应 - ELK

Combine request and response based on field in log - ELK

我们使用 filebeat、logstash、elasticsearch 和 kibana 设置了 ELK。 我需要在 logstash 中聚合请求和响应。

我已经配置了管道配置如下。现在,如果我将单个工作人员用于管道,日志聚合工作没有任何问题。如果我使用多个工人聚合不会发生。任何使用多个工作人员和日志聚合的解决方案?

 if [transaction] == "request" { 

       aggregate {
                        task_id => "%{id}"
                        code => "
                        map['method'] = event.get('method')
                        map['request'] = event.get('request')
                        map['user'] = event.get('user')
                        map['application'] = event.get('application')"
                        map_action => "create"
         }
                    drop {}#drop the request before persisting, to save indexing space in elasticsearch server
  }
  if [message] =~ "TRANSACTION:response" {

         aggregate {
                    task_id => "%{id}"
                    code => "
                    event.set('method', map['method'])
                    event.set('response', map['response'])
                    event.set('user', map['user'])
                    event.set('application', map['application'])"
                    map_action => "update"

         }
}

要使 aggregate 过滤器正常工作,您只能使用一个工作人员,如果您使用多个工作人员,您的响应事件可能会在您的请求之前得到处理,因此您的过滤器将无法工作。

这是弹性的docummented

You should be very careful to set Logstash filter workers to 1 (-w 1 flag) for this filter to work correctly otherwise events may be processed out of sequence and unexpected results will occur.