分层数据匹配和展示
Hierarchical data matching and displaying
在我的日志文件中,我有表示项目层次结构的数据,很像 http 日志文件可能显示网站的层次结构。
我可能有这样的数据
41 2016-01-01 01:41:32-500 show:category:all
41 2016-01-01 04:11:20-500 show:category:animals
42 2016-01-02 01:41:32-500 show:item:wallaby
42 2016-01-02 01:41:32-500 show:home
我会在这里放 3 件物品...%{NUMBER:terminal}
%{TIMESTAMP_ISO8601:ts}
和 (?<info>([^\r])*)
我使用 mutate
和 split
将 info 数据解析为一个数组,以将 lvl1:lvl2:lvl3
转换为 ['lvl1','lvl2','lvl3']
。
我感兴趣的是汇总数据以轻松获得各个级别的计数,例如计算 info[0]
相同或 info[0]
和 info[1]
相同的所有记录相同的。 (并且能够select时间范围和终端)
有没有办法设置kibana来可视化这种信息?
或者我应该更改过滤器匹配数据的方式以使数据更易于访问吗?
级别的深度各不相同,但我可以非常确定最大级别为 5,因此我可以将文本解析为各个字段 lvl1
lvl2
lvl3
lvl4
lvl5
而不是将它们放在数组中。
根据你的问题,我同意你解析数据的方式。但我想添加更多内容,使其可以使用 Kibana 直接聚合和可视化。
方法应该是:-
- 使用 %{NUMBER:terminal} %{TIMESTAMP_ISO8601:ts} 和 (?([^\r])*) {根据您提供的信息}过滤数据}
- 变异
- 过滤器
然后在使用 mutate & filter 之后,您将获得数组形式的数据{如您所述}
- 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][0]}" ]
添加字段作为级别 1
- 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][1]}" ]
将字段添加为级别 2
- 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][2]}" ]
添加字段作为级别 3
那么你可以直接使用Kibana来可视化这些信息。
我的解决方案
input {
file {
path => "C:/Temp/zipped/*.txt"
start_position => beginning
ignore_older => 0
sincedb_path => "C:/temp/logstash_temp2.sincedb"
}
}
filter {
grok {
match => ["message","^%{NOTSPACE}\[%{NUMBER:terminal_id}\] %{NUMBER:log_level} %{NUMBER} %{TIMESTAMP_ISO8601:ts} \[(?<facility>([^\]]*))\] (?<lvl>([^$|\r])*)"]
}
mutate {
split => ["lvl", ":"]
add_field => {"lvl_1" => "%{lvl[0]}"}
add_field => {"lvl_2" => "%{lvl[1]}"}
add_field => {"lvl_3" => "%{lvl[2]}"}
add_field => {"lvl_4" => "%{lvl[3]}"}
add_field => {"lvl_5" => "%{lvl[4]}"}
add_field => {"lvl_6" => "%{lvl[5]}"}
add_field => {"lvl_7" => "%{lvl[6]}"}
add_field => {"lvl_8" => "%{lvl[7]}"}
lowercase => [ "terminal_id" ] # set to lowercase so that it can be used for index - additional filtering may be required
}
date {
match => ["ts", "YYYY-MM-DD HH:mm:ssZZ"]
}
}
filter {
if [lvl_1] =~ /%\{lvl\[0\]\}/ {mutate {remove_field => [ "lvl_1" ]}}
if [lvl_2] =~ /%\{lvl\[1\]\}/ {mutate {remove_field => [ "lvl_2" ]}}
if [lvl_3] =~ /%\{lvl\[2\]\}/ {mutate {remove_field => [ "lvl_3" ]}}
if [lvl_4] =~ /%\{lvl\[3\]\}/ {mutate {remove_field => [ "lvl_4" ]}}
if [lvl_5] =~ /%\{lvl\[4\]\}/ {mutate {remove_field => [ "lvl_5" ]}}
if [lvl_6] =~ /%\{lvl\[5\]\}/ {mutate {remove_field => [ "lvl_6" ]}}
if [lvl_7] =~ /%\{lvl\[6\]\}/ {mutate {remove_field => [ "lvl_7" ]}}
if [lvl_8] =~ /%\{lvl\[7\]\}/ {mutate {remove_field => [ "lvl_8" ]}}
mutate{
remove_field => [ "lvl","host","ts" ] # do not keep this data
}
}
output {
if [facility] == "mydata" {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-mydata-%{terminal_id}-%{+YYYY.MM.DD}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-other-%{terminal_id}-%{+YYYY.MM.DD}"
}
}
# stdout { codec => rubydebug }
}
在我的日志文件中,我有表示项目层次结构的数据,很像 http 日志文件可能显示网站的层次结构。
我可能有这样的数据
41 2016-01-01 01:41:32-500 show:category:all
41 2016-01-01 04:11:20-500 show:category:animals
42 2016-01-02 01:41:32-500 show:item:wallaby
42 2016-01-02 01:41:32-500 show:home
我会在这里放 3 件物品...%{NUMBER:terminal}
%{TIMESTAMP_ISO8601:ts}
和 (?<info>([^\r])*)
我使用 mutate
和 split
将 info 数据解析为一个数组,以将 lvl1:lvl2:lvl3
转换为 ['lvl1','lvl2','lvl3']
。
我感兴趣的是汇总数据以轻松获得各个级别的计数,例如计算 info[0]
相同或 info[0]
和 info[1]
相同的所有记录相同的。 (并且能够select时间范围和终端)
有没有办法设置kibana来可视化这种信息?
或者我应该更改过滤器匹配数据的方式以使数据更易于访问吗?
级别的深度各不相同,但我可以非常确定最大级别为 5,因此我可以将文本解析为各个字段 lvl1
lvl2
lvl3
lvl4
lvl5
而不是将它们放在数组中。
根据你的问题,我同意你解析数据的方式。但我想添加更多内容,使其可以使用 Kibana 直接聚合和可视化。
方法应该是:-
- 使用 %{NUMBER:terminal} %{TIMESTAMP_ISO8601:ts} 和 (?([^\r])*) {根据您提供的信息}过滤数据}
- 变异
- 过滤器
然后在使用 mutate & filter 之后,您将获得数组形式的数据{如您所述}
- 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][0]}" ] 添加字段作为级别 1
- 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][1]}" ] 将字段添加为级别 2
- 现在您可以通过提及 add_field => [ "fieldname", "%{[arrayname][2]}" ] 添加字段作为级别 3
那么你可以直接使用Kibana来可视化这些信息。
我的解决方案
input {
file {
path => "C:/Temp/zipped/*.txt"
start_position => beginning
ignore_older => 0
sincedb_path => "C:/temp/logstash_temp2.sincedb"
}
}
filter {
grok {
match => ["message","^%{NOTSPACE}\[%{NUMBER:terminal_id}\] %{NUMBER:log_level} %{NUMBER} %{TIMESTAMP_ISO8601:ts} \[(?<facility>([^\]]*))\] (?<lvl>([^$|\r])*)"]
}
mutate {
split => ["lvl", ":"]
add_field => {"lvl_1" => "%{lvl[0]}"}
add_field => {"lvl_2" => "%{lvl[1]}"}
add_field => {"lvl_3" => "%{lvl[2]}"}
add_field => {"lvl_4" => "%{lvl[3]}"}
add_field => {"lvl_5" => "%{lvl[4]}"}
add_field => {"lvl_6" => "%{lvl[5]}"}
add_field => {"lvl_7" => "%{lvl[6]}"}
add_field => {"lvl_8" => "%{lvl[7]}"}
lowercase => [ "terminal_id" ] # set to lowercase so that it can be used for index - additional filtering may be required
}
date {
match => ["ts", "YYYY-MM-DD HH:mm:ssZZ"]
}
}
filter {
if [lvl_1] =~ /%\{lvl\[0\]\}/ {mutate {remove_field => [ "lvl_1" ]}}
if [lvl_2] =~ /%\{lvl\[1\]\}/ {mutate {remove_field => [ "lvl_2" ]}}
if [lvl_3] =~ /%\{lvl\[2\]\}/ {mutate {remove_field => [ "lvl_3" ]}}
if [lvl_4] =~ /%\{lvl\[3\]\}/ {mutate {remove_field => [ "lvl_4" ]}}
if [lvl_5] =~ /%\{lvl\[4\]\}/ {mutate {remove_field => [ "lvl_5" ]}}
if [lvl_6] =~ /%\{lvl\[5\]\}/ {mutate {remove_field => [ "lvl_6" ]}}
if [lvl_7] =~ /%\{lvl\[6\]\}/ {mutate {remove_field => [ "lvl_7" ]}}
if [lvl_8] =~ /%\{lvl\[7\]\}/ {mutate {remove_field => [ "lvl_8" ]}}
mutate{
remove_field => [ "lvl","host","ts" ] # do not keep this data
}
}
output {
if [facility] == "mydata" {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-mydata-%{terminal_id}-%{+YYYY.MM.DD}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-other-%{terminal_id}-%{+YYYY.MM.DD}"
}
}
# stdout { codec => rubydebug }
}