使用 logstash 解析包含 python 回溯的日志
Parse logs containing python tracebacks using logstash
我一直在尝试使用 logstash 解析我的 python 回溯日志。我的日志如下所示:
[pid: 26422|app: 0|req: 73/73] 192.168.1.1 () {34 vars in 592 bytes} [Wed Feb 18 13:35:55 2015] GET /data => generated 2538923 bytes in 4078 msecs (HTTP/1.1 200) 2 headers in 85 bytes (1 switches on core 0)
Traceback (most recent call last):
File "/var/www/analytics/parser.py", line 257, in parselogfile
parselogline(basedir, lne)
File "/var/www/analytics/parser.py", line 157, in parselogline
pval = understandpost(parts[3])
File "/var/www/analytics/parser.py", line 98, in understandpost
val = json.loads(dct["events"])
File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python2.7/json/decoder.py", line 382, in raw_decode
obj, end = self.scan_once(s, idx)
ValueError: Unterminated string starting at: line 1 column 355 (char 354)
到目前为止,我已经能够解析除最后一行以外的日志,即
ValueError: Unterminated string starting at: line 1 column 355 (char 354)
我正在使用多行过滤器来执行此操作。我的 logstash 配置看起来像这样:
filter {
multiline {
pattern => "^Traceback"
what => "previous"
}
multiline {
pattern => "^ "
what => "previous"
}
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
if "_grokparsefailure" in [tags] {
multiline {
pattern => "^.*$"
what => "previous"
negate => "true"
}
}
if "_grokparsefailure" in [tags] {
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
remove_tag => ["_grokparsefailure"]
}
}
}
但我的最后一行没有解析。相反,它仍然给我一个错误,并且还成倍地增加了处理时间。关于如何解析回溯的最后一行的任何建议?
好吧,我找到了解决办法。所以我遵循的方法是我将忽略以“[”开头的日志消息的开头,所有其他行将附加在上一条消息的末尾。然后可以应用 grok 过滤器并可以解析回溯。请注意,我必须应用两个 grok 过滤器:
当有traceback的时候用GREEDYDATA获取traceback。
当没有回溯时,GREEDYDATA 解析失败,我将不得不删除 _grokparsefailure 标记,然后再次应用没有 GREEDYDATA 的 grok。这是在 if 块的帮助下完成的。
最终的 logstash 过滤器看起来像这样:
filter {
multiline {
pattern => "^[^\[]"
what => "previous"
}
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
if "_grokparsefailure" in [tags] {
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)"
]
remove_tag => ["_grokparsefailure"]
}
}
else {
mutate {
convert => {"traceback" => "string"}
}
}
date {
match => ["timestamp", "dd/MM/YYYY:HH:MM:ss Z"]
locale => en
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "Useragent"
}
}
或者,如果您不想使用 if 块来检查另一个 grok 模式并删除 _grokparsefailure
,您可以使用第一个 grok 过滤器通过包含多个消息来检查这两种消息类型-pattern 检查 grok 过滤器的 match
数组。可以这样做:
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)",
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
还有第三种方法(可能是最优雅的一种)。它看起来像这样:
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)(%{GREEDYDATA:traceback})?"
]
}
注意,在该方法中,可选存在的字段必须用"()?"括起来。这里,(%{GREEDYDATA:traceback})?
因此,grok 过滤器会发现如果该字段可用,则会对其进行解析。否则将跳过。
我一直在尝试使用 logstash 解析我的 python 回溯日志。我的日志如下所示:
[pid: 26422|app: 0|req: 73/73] 192.168.1.1 () {34 vars in 592 bytes} [Wed Feb 18 13:35:55 2015] GET /data => generated 2538923 bytes in 4078 msecs (HTTP/1.1 200) 2 headers in 85 bytes (1 switches on core 0)
Traceback (most recent call last):
File "/var/www/analytics/parser.py", line 257, in parselogfile
parselogline(basedir, lne)
File "/var/www/analytics/parser.py", line 157, in parselogline
pval = understandpost(parts[3])
File "/var/www/analytics/parser.py", line 98, in understandpost
val = json.loads(dct["events"])
File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python2.7/json/decoder.py", line 382, in raw_decode
obj, end = self.scan_once(s, idx)
ValueError: Unterminated string starting at: line 1 column 355 (char 354)
到目前为止,我已经能够解析除最后一行以外的日志,即
ValueError: Unterminated string starting at: line 1 column 355 (char 354)
我正在使用多行过滤器来执行此操作。我的 logstash 配置看起来像这样:
filter {
multiline {
pattern => "^Traceback"
what => "previous"
}
multiline {
pattern => "^ "
what => "previous"
}
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
if "_grokparsefailure" in [tags] {
multiline {
pattern => "^.*$"
what => "previous"
negate => "true"
}
}
if "_grokparsefailure" in [tags] {
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
remove_tag => ["_grokparsefailure"]
}
}
}
但我的最后一行没有解析。相反,它仍然给我一个错误,并且还成倍地增加了处理时间。关于如何解析回溯的最后一行的任何建议?
好吧,我找到了解决办法。所以我遵循的方法是我将忽略以“[”开头的日志消息的开头,所有其他行将附加在上一条消息的末尾。然后可以应用 grok 过滤器并可以解析回溯。请注意,我必须应用两个 grok 过滤器:
当有traceback的时候用GREEDYDATA获取traceback。
当没有回溯时,GREEDYDATA 解析失败,我将不得不删除 _grokparsefailure 标记,然后再次应用没有 GREEDYDATA 的 grok。这是在 if 块的帮助下完成的。
最终的 logstash 过滤器看起来像这样:
filter {
multiline {
pattern => "^[^\[]"
what => "previous"
}
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
if "_grokparsefailure" in [tags] {
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)"
]
remove_tag => ["_grokparsefailure"]
}
}
else {
mutate {
convert => {"traceback" => "string"}
}
}
date {
match => ["timestamp", "dd/MM/YYYY:HH:MM:ss Z"]
locale => en
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "Useragent"
}
}
或者,如果您不想使用 if 块来检查另一个 grok 模式并删除 _grokparsefailure
,您可以使用第一个 grok 过滤器通过包含多个消息来检查这两种消息类型-pattern 检查 grok 过滤器的 match
数组。可以这样做:
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)",
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}"
]
}
还有第三种方法(可能是最优雅的一种)。它看起来像这样:
grok {
match => [
"message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)(%{GREEDYDATA:traceback})?"
]
}
注意,在该方法中,可选存在的字段必须用"()?"括起来。这里,(%{GREEDYDATA:traceback})?
因此,grok 过滤器会发现如果该字段可用,则会对其进行解析。否则将跳过。