logstash GROK 过滤器和 KV 插件无法处理事件
logstash GROK filter along with KV plugin couldn't able to process the events
我是 ELK 的新手。当我加入以下日志文件时,它会在 logstash 中进入 "dead letter queue" 因为 logstash 无法处理 events.I 已经编写了 GROK 过滤器来解析事件但 logstash 仍然无法处理事件。任何帮助将不胜感激。
下面是示例日志格式。
25193662345 [http-nio-8080-exec-44] DEBUG c.s.b.a.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=31, totalTime=33 tenantId=b9sdfs-1033-4444-aba5-csdfsdfsf, immutableBlobId=bss_c_586331/Sample_app12-sdas-157123148464.txt, blobSize=2862, domain=abc
2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde
GROK 过滤器:
dissect { mapping => { "message" => "%{NUMBER:number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" } }
kv { source => "[@metadata][msg]" field_split => "," }
谢谢
你的配置基本上有两个问题。
1.) 您使用的是 dissect
过滤器,而不是 grok
,两者都用于解析消息,但是 grok
使用正则表达式来验证字段的值和dissect
只是位置,它不执行任何验证,如果您在需要 NUMBER 的字段位置有一个 WORD 值,grok
将失败,但 dissect
不会。
如果您的日志行始终具有相同的模式,您应该继续使用 dissect
,因为它更快并且需要更少 cpu。
您正确的 dissect
映射应该是:
dissect {
mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
}
2.) 包含 kv 消息的字段是错误的,它的字段由 space 和逗号分隔,kv
不会这样工作。
经过 dissect
过滤后,这是 [@metadata][msg]
的内容。
method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde
要解决此问题,您应该使用 mutate 过滤器从 [@metadata][msg]
中删除逗号,并使用具有默认配置的 kv
过滤器。
这应该是您的过滤器配置
filter {
dissect {
mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
}
mutate {
gsub => ["[@metadata][msg]",",",""]
}
kv {
source => "[@metadata][msg]"
}
}
你的输出应该是这样的:
{
"number" => "2519366789",
"@timestamp" => 2019-11-03T16:42:11.708Z,
"thread" => "http-nio-8080-exec-47",
"appLogicTime" => "1",
"domain" => "cde",
"method" => "PUT",
"level" => "DEBUG",
"blobSize" => "2862",
"@version" => "1",
"immutableBlobId" => "bss_c_586334/Sample_app15-615223-157sadas6648465.txt",
"streamInTime" => "0",
"status" => "201",
"blobStorageTime" => "32",
"message" => "2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde",
"totalTime" => "33",
"tenantId" => "b0csdfsd-1066-4444-adf4-ce7bsdfssdf",
"class" => "q.s.b.y.m.PerformanceMetricsFilter"
}
我是 ELK 的新手。当我加入以下日志文件时,它会在 logstash 中进入 "dead letter queue" 因为 logstash 无法处理 events.I 已经编写了 GROK 过滤器来解析事件但 logstash 仍然无法处理事件。任何帮助将不胜感激。
下面是示例日志格式。
25193662345 [http-nio-8080-exec-44] DEBUG c.s.b.a.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=31, totalTime=33 tenantId=b9sdfs-1033-4444-aba5-csdfsdfsf, immutableBlobId=bss_c_586331/Sample_app12-sdas-157123148464.txt, blobSize=2862, domain=abc
2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde
GROK 过滤器:
dissect { mapping => { "message" => "%{NUMBER:number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" } }
kv { source => "[@metadata][msg]" field_split => "," }
谢谢
你的配置基本上有两个问题。
1.) 您使用的是 dissect
过滤器,而不是 grok
,两者都用于解析消息,但是 grok
使用正则表达式来验证字段的值和dissect
只是位置,它不执行任何验证,如果您在需要 NUMBER 的字段位置有一个 WORD 值,grok
将失败,但 dissect
不会。
如果您的日志行始终具有相同的模式,您应该继续使用 dissect
,因为它更快并且需要更少 cpu。
您正确的 dissect
映射应该是:
dissect {
mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
}
2.) 包含 kv 消息的字段是错误的,它的字段由 space 和逗号分隔,kv
不会这样工作。
经过 dissect
过滤后,这是 [@metadata][msg]
的内容。
method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde
要解决此问题,您应该使用 mutate 过滤器从 [@metadata][msg]
中删除逗号,并使用具有默认配置的 kv
过滤器。
这应该是您的过滤器配置
filter {
dissect {
mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
}
mutate {
gsub => ["[@metadata][msg]",",",""]
}
kv {
source => "[@metadata][msg]"
}
}
你的输出应该是这样的:
{
"number" => "2519366789",
"@timestamp" => 2019-11-03T16:42:11.708Z,
"thread" => "http-nio-8080-exec-47",
"appLogicTime" => "1",
"domain" => "cde",
"method" => "PUT",
"level" => "DEBUG",
"blobSize" => "2862",
"@version" => "1",
"immutableBlobId" => "bss_c_586334/Sample_app15-615223-157sadas6648465.txt",
"streamInTime" => "0",
"status" => "201",
"blobStorageTime" => "32",
"message" => "2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde",
"totalTime" => "33",
"tenantId" => "b0csdfsd-1066-4444-adf4-ce7bsdfssdf",
"class" => "q.s.b.y.m.PerformanceMetricsFilter"
}