我如何从 logstash 上的 JSON 日志中提取源主机名
how can i extract source hostnames from JSON logs on logstash
我正在使用 OSSEC 收集日志并使用 logstash-forwarder 将 JSON 日志转发到 logstash。这是我的 logstash 配置。
input {
lumberjack {
port => 10516
type => "lumberjack"
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
codec => json
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
host => localhost
}
}
我想提取括号内 "location" 字段中指示的主机并创建专用标记,因为 logstash 仅将 OSSEC 视为源主机,因为它转发日志。下面是 logstash 的示例输出。
{
"_index": "logstash-2015.09.23",
"_type": "ossec-alerts",
"_id": "AU_4Q1Hc5OjGfEBnRiWa",
"_score": null,
"_source": {
"rule": {
"level": 3,
"comment": "Nginx error message.",
"sidid": 31301
},
"srcip": "192.168.192.10",
"location": "(logstash) 192.168.212.104->/var/log/nginx/error.log",
"full_log": "2015/09/23 11:33:24 [error] 1057#0: *562 connect() failed (111: Connection refused) while connecting to upstream, client: 192.168.192.10, server: _, request: \"POST /elasticsearch/.kibana/__kibanaQueryValidator/_validate/query?explain=true&ignore_unavailable=true HTTP/1.1\", upstream: \"http://[::1]:5601/elasticsearch/.kibana/__kibanaQueryValidator/_validate/query?explain=true&ignore_unavailable=true\", host: \"192.168.212.104\", referrer: \"http://192.168.212.104/\"",
"@version": "1",
"@timestamp": "2015-09-23T03:33:25.588Z",
"type": "ossec-alerts",
"file": "/var/ossec/logs/alerts/alerts.json",
"host": "ossec",
"offset": "51048"
},
"fields": {
"@timestamp": [
1442979205588
]
},
"sort": [
1442979205588
]
}
应用 json{} 过滤器后,您会留下一堆字段。您现在可以对这些字段应用更多过滤器,包括 grok{} 以创建 更多 个字段!
您需要的是 grok filter. You can use the grok debugger 来找到最适合您的模式。以下模式应该适用于您的 location
字段:
\(%{HOST:host}\) %{IP:srcip}->%{PATH:path}
在 logstash 过滤器部分:
grok {
match => { "location" => "\(%{HOST:host}\) %{IP:srcip}->%{PATH:path}" }
overwrite => [ "host", "srcip" ]
}
overwrite
是必需的,因为您已经有字段 host
和 srcip
.
我正在使用 OSSEC 收集日志并使用 logstash-forwarder 将 JSON 日志转发到 logstash。这是我的 logstash 配置。
input {
lumberjack {
port => 10516
type => "lumberjack"
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
codec => json
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
host => localhost
}
}
我想提取括号内 "location" 字段中指示的主机并创建专用标记,因为 logstash 仅将 OSSEC 视为源主机,因为它转发日志。下面是 logstash 的示例输出。
{
"_index": "logstash-2015.09.23",
"_type": "ossec-alerts",
"_id": "AU_4Q1Hc5OjGfEBnRiWa",
"_score": null,
"_source": {
"rule": {
"level": 3,
"comment": "Nginx error message.",
"sidid": 31301
},
"srcip": "192.168.192.10",
"location": "(logstash) 192.168.212.104->/var/log/nginx/error.log",
"full_log": "2015/09/23 11:33:24 [error] 1057#0: *562 connect() failed (111: Connection refused) while connecting to upstream, client: 192.168.192.10, server: _, request: \"POST /elasticsearch/.kibana/__kibanaQueryValidator/_validate/query?explain=true&ignore_unavailable=true HTTP/1.1\", upstream: \"http://[::1]:5601/elasticsearch/.kibana/__kibanaQueryValidator/_validate/query?explain=true&ignore_unavailable=true\", host: \"192.168.212.104\", referrer: \"http://192.168.212.104/\"",
"@version": "1",
"@timestamp": "2015-09-23T03:33:25.588Z",
"type": "ossec-alerts",
"file": "/var/ossec/logs/alerts/alerts.json",
"host": "ossec",
"offset": "51048"
},
"fields": {
"@timestamp": [
1442979205588
]
},
"sort": [
1442979205588
]
}
应用 json{} 过滤器后,您会留下一堆字段。您现在可以对这些字段应用更多过滤器,包括 grok{} 以创建 更多 个字段!
您需要的是 grok filter. You can use the grok debugger 来找到最适合您的模式。以下模式应该适用于您的 location
字段:
\(%{HOST:host}\) %{IP:srcip}->%{PATH:path}
在 logstash 过滤器部分:
grok {
match => { "location" => "\(%{HOST:host}\) %{IP:srcip}->%{PATH:path}" }
overwrite => [ "host", "srcip" ]
}
overwrite
是必需的,因为您已经有字段 host
和 srcip
.