如何使用经过的过滤器 - logstash
how to use elapsed filter- logstash
我正在使用 Elapsed 过滤器。我阅读了 logstash 中的 Elapsed 过滤器指南。然后我制作了一个示例配置文件和 csv 来测试 Elapsed 过滤器的工作。但它似乎不起作用。将数据上传到 ES 没有变化。我附上了 csv 文件和配置代码。您能否举例说明如何使用经过的过滤器。
这是我的 csv 数据:
这是我的配置文件:
input {
file {
path => "/home/paulsteven/log_cars/aggreagate.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads","num_id","serial"]
}
elapsed {
start_tag => "taskStarted"
end_tag => "taskEnded"
unique_id_field => "num_id"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "el03"
document_type => "details"
}
stdout{}
}
ES 中的输出:
{
"city" => "tirunelveli",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "tamil nadu,tirunelveli,hap0,ad1,2345-1002-4501,1",
"@version" => "1",
"serial" => "1",
"haps" => "hap0",
"state" => "tamil nadu",
"host" => "smackcoders",
"ads" => "ad1",
"@timestamp" => 2019-05-06T10:03:51.443Z
}
{
"city" => "chennai",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "tamil nadu,chennai,hap0,ad1,2345-1002-4501,5",
"@version" => "1",
"serial" => "5",
"haps" => "hap0",
"state" => "tamil nadu",
"host" => "smackcoders",
"ads" => "ad1",
"@timestamp" => 2019-05-06T10:03:51.447Z
}
{
"city" => "kottayam",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "kerala,kottayam,hap1,ad2,2345-1002-4501,9",
"@version" => "1",
"serial" => "9",
"haps" => "hap1",
"state" => "kerala",
"host" => "smackcoders",
"ads" => "ad2",
"@timestamp" => 2019-05-06T10:03:51.449Z
}
{
"city" => "Jalna",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "mumbai,Jalna,hap2,ad3,2345-1002-4501,13",
"@version" => "1",
"serial" => "13",
"haps" => "hap2",
"state" => "mumbai",
"host" => "smackcoders",
"ads" => "ad3",
"@timestamp" => 2019-05-06T10:03:51.452Z
}
您必须标记您的事件,以便 Logstash 可以找到开始/结束标记。
基本上,您必须知道事件何时被视为开始事件以及何时被视为结束事件。
Elapsed 过滤器插件仅适用于两个事件(例如请求事件和响应事件,以便获得它们之间的延迟)
这两种事件都需要拥有一个唯一标识该特定任务的 ID 字段。此字段的名称存储在 unique_id_field.
中
对于您的示例,您必须确定开始和结束事件的模式,假设您在 csv 中有一列 type(请参阅下面的代码)当 type 包含 "START",该行被认为是开始事件,如果它包含 "END" 它是一个结束事件,非常简单,还有一个列 id 存储唯一标识符。
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads","num_id","serial", "type", "id"]
}
grok {
match => { "type" => ".*START.*" }
add_tag => [ "taskStarted" ]
}grok {
match => { "type" => ".*END*" }
add_tag => [ "taskTerminated" ]
} elapsed {
start_tag => "taskStarted"
end_tag => "taskTerminated"
unique_id_field => "id"
}
}
我觉得你的需求不一样。
如果您想聚合两个以上的事件,例如列状态具有相同值的所有事件,请查看 this plugin
我正在使用 Elapsed 过滤器。我阅读了 logstash 中的 Elapsed 过滤器指南。然后我制作了一个示例配置文件和 csv 来测试 Elapsed 过滤器的工作。但它似乎不起作用。将数据上传到 ES 没有变化。我附上了 csv 文件和配置代码。您能否举例说明如何使用经过的过滤器。
这是我的 csv 数据:
这是我的配置文件:
input {
file {
path => "/home/paulsteven/log_cars/aggreagate.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads","num_id","serial"]
}
elapsed {
start_tag => "taskStarted"
end_tag => "taskEnded"
unique_id_field => "num_id"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "el03"
document_type => "details"
}
stdout{}
}
ES 中的输出:
{
"city" => "tirunelveli",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "tamil nadu,tirunelveli,hap0,ad1,2345-1002-4501,1",
"@version" => "1",
"serial" => "1",
"haps" => "hap0",
"state" => "tamil nadu",
"host" => "smackcoders",
"ads" => "ad1",
"@timestamp" => 2019-05-06T10:03:51.443Z
}
{
"city" => "chennai",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "tamil nadu,chennai,hap0,ad1,2345-1002-4501,5",
"@version" => "1",
"serial" => "5",
"haps" => "hap0",
"state" => "tamil nadu",
"host" => "smackcoders",
"ads" => "ad1",
"@timestamp" => 2019-05-06T10:03:51.447Z
}
{
"city" => "kottayam",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "kerala,kottayam,hap1,ad2,2345-1002-4501,9",
"@version" => "1",
"serial" => "9",
"haps" => "hap1",
"state" => "kerala",
"host" => "smackcoders",
"ads" => "ad2",
"@timestamp" => 2019-05-06T10:03:51.449Z
}
{
"city" => "Jalna",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "mumbai,Jalna,hap2,ad3,2345-1002-4501,13",
"@version" => "1",
"serial" => "13",
"haps" => "hap2",
"state" => "mumbai",
"host" => "smackcoders",
"ads" => "ad3",
"@timestamp" => 2019-05-06T10:03:51.452Z
}
您必须标记您的事件,以便 Logstash 可以找到开始/结束标记。 基本上,您必须知道事件何时被视为开始事件以及何时被视为结束事件。
Elapsed 过滤器插件仅适用于两个事件(例如请求事件和响应事件,以便获得它们之间的延迟) 这两种事件都需要拥有一个唯一标识该特定任务的 ID 字段。此字段的名称存储在 unique_id_field.
中对于您的示例,您必须确定开始和结束事件的模式,假设您在 csv 中有一列 type(请参阅下面的代码)当 type 包含 "START",该行被认为是开始事件,如果它包含 "END" 它是一个结束事件,非常简单,还有一个列 id 存储唯一标识符。
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads","num_id","serial", "type", "id"]
}
grok {
match => { "type" => ".*START.*" }
add_tag => [ "taskStarted" ]
}grok {
match => { "type" => ".*END*" }
add_tag => [ "taskTerminated" ]
} elapsed {
start_tag => "taskStarted"
end_tag => "taskTerminated"
unique_id_field => "id"
}
}
我觉得你的需求不一样。 如果您想聚合两个以上的事件,例如列状态具有相同值的所有事件,请查看 this plugin