将日志条目与 logstash 结合

Combining log entries with logstash

我想从 dnsmasq 收集和处理日志,我决定使用 ELK。 Dnsmasq 用作 DHCP 服务器和 DNS 解析器,因此它为这两种服务创建日志条目。

我的目标是向 Elasticsearch 发送所有带有请求者 IP、请求者主机名(如果可用)和请求者 mac 地址的 DNS 查询。这将允许我根据 mac 地址对请求进行分组,无论设备 IP 是否更改,并显示主机名。

我想做的是:

1) 阅读如下条目:

Mar 30 21:55:34 dnsmasq-dhcp[346]: 3806132383 DHCPACK(eth0)  192.168.0.80 04:0c:ce:d1:af:18 air

2) 暂存关系:

192.168.0.80 => 04:0c:ce:d1:af:18

192.168.0.80 => 空气

3) 丰富如下条目,添加 mac 地址和主机名。如果主机名是空的,我会添加 mac 地址。

Mar 30 22:13:05 dnsmasq[346]: query[A] imap.gmail.com from 192.168.0.80

我找到了一个名为 “memorize” 的模块,可以让我存储它们,但不幸的是,它不适用于最新版本的 Logstash

我使用的版本:

ElastiSearch 2.3.0
Kibana 4.4.2
Logstash 2.2.2

还有 logstash 过滤器(这是我第一次尝试使用 logstash,因此我确信可以改进配置文件)

input {
  file {
    path => "/var/log/dnsmasq.log"
    start_position => "beginning"
    type => "dnsmasq"
  }
}  

filter {
  if [type] == "dnsmasq" {
    grok {
      match =>  [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{NOTSPACE:action} %{IP:clientip} %{MAC:clientmac} ?(%{HOSTNAME:clientname})?"]
      match =>  [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{USER:action}?(\[%{USER:subaction}\])? %{NOTSPACE:domain} %{NOTSPACE:function} %{IP:clientip}"]
      match =>  [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: %{NOTSPACE:action} %{DATA:data}"]
    }

    if [action] =~ "DHCPACK" {

    }else if [action] == "query" {

    }else
    {
      drop{}
    }
  }
}
output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}

问题:

1) 是否有替代插件 “memorize” 使用最新的 logstash 版本?另一个插件或不同的程序。

2) 是否要将logstash降级到2之前的版本(我认为之前的是1.5.4)?如果是,是否存在任何已知的服务器问题或与 elasticsearch 2.2.1 不兼容?

3) 或者我应该修改插件“记忆”以允许 logstash 2.x(如果是这样,我会很感激任何关于如何开始的指示)?

所以要使用 memory with logstash >2.0

  • 克隆存储库。
  • 打开文件 logstash-filter-memorize.gemspec
  • s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0' 更改为 s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 3.0.0'
  • 通过以下方式构建插件:gem build logstash-filter-memorize.gemspec
  • 通过以下方式安装:$ bin/logstash-plugin install /path/to/memorize/logstash-filter-memorize-0.9.1.gem

我试过了,似乎有效。

我认为没有必要为此重新打包 memorize 插件。您可以使用 aggregate filter 来实现您想要的。

...

# record host/mac in temporary map
if [action] =~ "DHCPACK" {
  aggregate {
     task_id => "%{clientip}"
     code => "map['clientmac'] = event['clientmac']; map['clientname'] = event['clientname'];"
     map_action => "create_or_update"
     # timeout set to 48h
     timeout => 172800
  }
}

# add host/mac where/when needed
else if [action] == "query" {
   aggregate {
     task_id => "%{clientip}"
     code => "event['clientmac'] = map['clientmac']; event['clientname'] = map['clientname']"
     map_action => "update"
   }
}