解析多行日志时 logstash 配置问题
Issue with the logstash config while parsing multiline log
我有以下多行日志,我正在尝试使用我的 logstash 配置进行解析。
2020-05-27 11:59:17 ----------------------------------------------------------------------
2020-05-27 11:59:17 Got context
2020-05-27 11:59:17 Raw context:
EMAIL=abc.def@example.com
NAME=abc.def
PAGER=+11111111111111
DATE=2020-05-27
AUTHOR=
COMMENT=
ADDRESS=1.1.1.1
ALIAS=abc.example.com
ATTEMPT=1
2020-05-27 11:59:17 Previous service hard state not known. Allowing all states.
2020-05-27 11:59:17 Computed variables:
URL=abc.example.com
STATE=UP
2020-05-27 11:59:17 Preparing flexible notifications for abc.def
2020-05-27 11:59:17 channel with plugin sms
2020-05-27 11:59:17 - Skipping: set
2020-05-27 11:59:17 channel with plugin plain email
2020-05-27 11:59:20 --------------------------------------------------------------------
这是我的 logstash 配置:
input {
stdin { }
}
filter {
grok {
match => { "message" => "(?m)%{GREEDYDATA:data}"}
}
if [data] {
mutate {
gsub => [
"data", "^\s*", ""
]
}
mutate {
gsub => ['data', "\n", " "]
}
}
}
output {
stdout { codec => rubydebug }
}
Filebeat 配置:
multiline.pattern: '^[[:space:]][A-Za-z]* (?m)'
multiline.negate: false
multiline.match: after
我想达到的目标:
多行日志将首先与多行模式匹配,并被拆分成像
这样的行
Message1: 2020-05-27 11:59:17 ----------------------------------------------------------------------
Message2: 2020-05-27 11:59:17 Got context
Message3: 2020-05-27 11:59:17 Raw notification context:
EMAIL=abc.def@example.com
NAME=abc.def
PAGER=+11111111111111
DATE=2020-05-27
AUTHOR=
COMMENT=
ADDRESS=1.1.1.1
ALIAS=abc.example.com
ATTEMPT=1
在此之后,当这些日志行被解析时,它将再次被分隔符分隔,然后我可以使用 kv 过滤器读取每个键值对,如 ALIAS=abc.example.com 在单个消息编号 3 .
你能告诉我如何做到这一点吗?
我建议您使用多行编解码器从文件中读取(如果您使用标准输入,您也可以在过滤器部分定义它)同时为每个新行提供带有时间戳前缀的模式。
然后在你的 grok 过滤器中使用 KV filter 来拆分字段和值,如下所示:
input {
file {
path => "C:/work/elastic/logstash-6.5.0/config/test.txt"
start_position => "beginning"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
filter {
kv {
field_split => "\r\n"
value_split => "="
source => "message"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test"
}
}
Kibana 中的结果应如下所示:
还有这个:
编辑:在您声明的评论中,您没有看到包括空格在内的全部价值。我已经用您提供的新状态重新测试了我的解决方案,它工作正常:
我有以下多行日志,我正在尝试使用我的 logstash 配置进行解析。
2020-05-27 11:59:17 ----------------------------------------------------------------------
2020-05-27 11:59:17 Got context
2020-05-27 11:59:17 Raw context:
EMAIL=abc.def@example.com
NAME=abc.def
PAGER=+11111111111111
DATE=2020-05-27
AUTHOR=
COMMENT=
ADDRESS=1.1.1.1
ALIAS=abc.example.com
ATTEMPT=1
2020-05-27 11:59:17 Previous service hard state not known. Allowing all states.
2020-05-27 11:59:17 Computed variables:
URL=abc.example.com
STATE=UP
2020-05-27 11:59:17 Preparing flexible notifications for abc.def
2020-05-27 11:59:17 channel with plugin sms
2020-05-27 11:59:17 - Skipping: set
2020-05-27 11:59:17 channel with plugin plain email
2020-05-27 11:59:20 --------------------------------------------------------------------
这是我的 logstash 配置:
input {
stdin { }
}
filter {
grok {
match => { "message" => "(?m)%{GREEDYDATA:data}"}
}
if [data] {
mutate {
gsub => [
"data", "^\s*", ""
]
}
mutate {
gsub => ['data', "\n", " "]
}
}
}
output {
stdout { codec => rubydebug }
}
Filebeat 配置:
multiline.pattern: '^[[:space:]][A-Za-z]* (?m)'
multiline.negate: false
multiline.match: after
我想达到的目标: 多行日志将首先与多行模式匹配,并被拆分成像
这样的行 Message1: 2020-05-27 11:59:17 ----------------------------------------------------------------------
Message2: 2020-05-27 11:59:17 Got context
Message3: 2020-05-27 11:59:17 Raw notification context:
EMAIL=abc.def@example.com
NAME=abc.def
PAGER=+11111111111111
DATE=2020-05-27
AUTHOR=
COMMENT=
ADDRESS=1.1.1.1
ALIAS=abc.example.com
ATTEMPT=1
在此之后,当这些日志行被解析时,它将再次被分隔符分隔,然后我可以使用 kv 过滤器读取每个键值对,如 ALIAS=abc.example.com 在单个消息编号 3 .
你能告诉我如何做到这一点吗?
我建议您使用多行编解码器从文件中读取(如果您使用标准输入,您也可以在过滤器部分定义它)同时为每个新行提供带有时间戳前缀的模式。
然后在你的 grok 过滤器中使用 KV filter 来拆分字段和值,如下所示:
input {
file {
path => "C:/work/elastic/logstash-6.5.0/config/test.txt"
start_position => "beginning"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
filter {
kv {
field_split => "\r\n"
value_split => "="
source => "message"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test"
}
}
Kibana 中的结果应如下所示:
还有这个:
编辑:在您声明的评论中,您没有看到包括空格在内的全部价值。我已经用您提供的新状态重新测试了我的解决方案,它工作正常: