如何使用经过的过滤器 - logstash

how to use elapsed filter- logstash

我正在使用 Elapsed 过滤器。我阅读了 logstash 中的 Elapsed 过滤器指南。然后我制作了一个示例配置文件和 csv 来测试 Elapsed 过滤器的工作。但它似乎不起作用。将数据上传到 ES 没有变化。我附上了 csv 文件和配置代码。您能否举例说明如何使用经过的过滤器。

这是我的 csv 数据:

这是我的配置文件:

input {
     file {
      path => "/home/paulsteven/log_cars/aggreagate.csv"
      start_position => "beginning"
      sincedb_path => "/dev/null"
   }
}
filter {
    csv {
        separator => ","
        quote_char => "%"
        columns => ["state","city","haps","ads","num_id","serial"]
    }
    elapsed {
        start_tag => "taskStarted"
        end_tag => "taskEnded"
        unique_id_field => "num_id"
    }

}
output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "el03"
    document_type => "details"
  }
  stdout{}
}

ES 中的输出:

{
          "city" => "tirunelveli",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "tamil nadu,tirunelveli,hap0,ad1,2345-1002-4501,1",
      "@version" => "1",
        "serial" => "1",
          "haps" => "hap0",
         "state" => "tamil nadu",
          "host" => "smackcoders",
           "ads" => "ad1",
    "@timestamp" => 2019-05-06T10:03:51.443Z
}
{
          "city" => "chennai",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "tamil nadu,chennai,hap0,ad1,2345-1002-4501,5",
      "@version" => "1",
        "serial" => "5",
          "haps" => "hap0",
         "state" => "tamil nadu",
          "host" => "smackcoders",
           "ads" => "ad1",
    "@timestamp" => 2019-05-06T10:03:51.447Z
}
{
          "city" => "kottayam",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "kerala,kottayam,hap1,ad2,2345-1002-4501,9",
      "@version" => "1",
        "serial" => "9",
          "haps" => "hap1",
         "state" => "kerala",
          "host" => "smackcoders",
           "ads" => "ad2",
    "@timestamp" => 2019-05-06T10:03:51.449Z
}
{
          "city" => "Jalna",
          "path" => "/home/paulsteven/log_cars/aggreagate.csv",
        "num_id" => "2345-1002-4501",
       "message" => "mumbai,Jalna,hap2,ad3,2345-1002-4501,13",
      "@version" => "1",
        "serial" => "13",
          "haps" => "hap2",
         "state" => "mumbai",
          "host" => "smackcoders",
           "ads" => "ad3",
    "@timestamp" => 2019-05-06T10:03:51.452Z
}

您必须标记您的事件,以便 Logstash 可以找到开始/结束标记。 基本上,您必须知道事件何时被视为开始事件以及何时被视为结束事件。

Elapsed 过滤器插件仅适用于两个事件(例如请求事件和响应事件,以便获得它们之间的延迟) 这两种事件都需要拥有一个唯一标识该特定任务的 ID 字段。此字段的名称存储在 unique_id_field.

对于您的示例,您必须确定开始和结束事件的模式,假设您在 csv 中有一列 type(请参阅下面的代码)当 type 包含 "START",该行被认为是开始事件,如果它包含 "END" 它是一个结束事件,非常简单,还有一个列 id 存储唯一标识符。

filter {
  csv {
    separator => ","
    quote_char => "%"
    columns => ["state","city","haps","ads","num_id","serial", "type", "id"]
    }
  grok {
    match => { "type" => ".*START.*" }
    add_tag => [ "taskStarted" ]
  }grok {
  match => { "type" => ".*END*" }
  add_tag => [ "taskTerminated" ]
}  elapsed {
    start_tag => "taskStarted"
    end_tag => "taskTerminated"
    unique_id_field => "id"
  }
}

我觉得你的需求不一样。 如果您想聚合两个以上的事件,例如列状态具有相同值的所有事件,请查看 this plugin