Logstash Netflow 模块监听,但不读取数据包
Logstash Netflow Module listening, but not reading packets
带有 netflow 模块的 Logstash 6.2.4
Elasticsearch 版本:6.2.4
Ubuntu 16.04 LTS
我遇到一个问题,logstash 正在侦听正确的端口,但似乎没有收集
netflow 数据并将其传递给 elasticsearch。
我们网络中的路由器正在将它们的 netflow 数据发送到服务器 A,并且 nfcap 正在侦听端口
9995 所以尝试 运行 logstash 使用服务器 A 上的 netflow 模块导致地址正在使用
错误。所以,我正在使用 iptables 来复制数据包并将它们转发到不同的服务器,服务器 B,就像这样。
iptables -t mangle -A PREROUTING -p udp --dport 9995 -j TEE --gateway <Server B ip address>
通过 tcpdump 检查,我可以看到服务器 B 正在接收重复的数据包,其中包含服务器 A 的 IP 地址。
输出如下,但出于安全原因,我已经编辑了 IP 地址。
tcpdump -i eno1 -n dst port 9995
12:49:49.130772 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.131067 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133504 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133527 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133533 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1260
12:49:49.391871 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1452
12:49:49.391894 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1368
所以,我知道服务器 B 正在端口 9995 上接收数据包。
检查 netstat 也显示了这一点。
netstat -an | grep 9995
udp 0 0 0.0.0.0:9995 0.0.0.0:*
logstash.yml如下
node.name: server-b
path.data: /var/lib/logstash
http.host: "0.0.0.0"
modules:
- name: netflow
var.input.udp.port: 9995 # Inbound connections
var.elasticsearch.hosts: "<ip address>:9200"
var.kibana.host: "<ip address>:5601"
path.logs: /var/log/logstash
检查/var/log/logstash/logstash-plain.log,我看到的唯一警告是关于 elasticsearch 的版本大于版本 6,因此不会使用类型
确定文档类型。
[2018-07-06T12:58:13,771][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-06T12:58:13,817][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-06T12:58:17,599][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"module-netflow", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-06T12:58:17,733][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://<ip address>:9200/]}}
[2018-07-06T12:58:17,734][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://<ip address>:9200/, :path=>"/"}
[2018-07-06T12:58:17,784][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://<ip address>:9200/"}
[2018-07-06T12:58:17,810][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-07-06T12:58:17,810][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-07-06T12:58:17,811][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//<ip address>:9200"]}
[2018-07-06T12:58:18,088][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,101][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,102][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,120][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:9995"}
[2018-07-06T12:58:18,126][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"module-netflow", :thread=>"#<Thread:0x16700849@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[2018-07-06T12:58:18,131][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:9995", :receive_buffer_bytes=>"212992", :queue_size=>"2000"}
[2018-07-06T12:58:18,135][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["module-netflow"]}
ElasticSearch 正在 运行ning 并且正在从 packetbeat 和 filebeat 接收数据,/var/log/elasticsearch/elasticsearch.log 中没有任何内容表明 elasticsearch 有任何错误。
但是,elasticsearch 没有 netflow 的索引模式。另一方面,Kibana 可以。
因此,服务器 B 上的 logstash 正在侦听 0.0.0.0:9995,端口 9995 已打开并从服务器 A 接收数据包,但 logstash 无法识别这些数据包。
我的假设是服务器 B 忽略了它们,因为目标 IP 地址是服务器 A 的 IP 地址。这听起来对吗?如果是这样,有没有办法解决这个问题?
是否有更好的方法将重复的数据包从服务器 A 转发到服务器 B 并让 logstash 读取它们?
不幸的是,无法将另一个 netflow 导出器目标添加到路由器配置中。
我会回答我自己的问题。
服务器 B 确实忽略了 netflow 数据,因为它无法识别 IP 地址。
我添加了服务器 A 的 ip 地址作为环回接口,它按预期工作。
这可能不是最好的解决方案,也是在生产环境中应避免的解决方案,但出于测试目的,它应该没问题。
带有 netflow 模块的 Logstash 6.2.4
Elasticsearch 版本:6.2.4
Ubuntu 16.04 LTS
我遇到一个问题,logstash 正在侦听正确的端口,但似乎没有收集 netflow 数据并将其传递给 elasticsearch。
我们网络中的路由器正在将它们的 netflow 数据发送到服务器 A,并且 nfcap 正在侦听端口 9995 所以尝试 运行 logstash 使用服务器 A 上的 netflow 模块导致地址正在使用 错误。所以,我正在使用 iptables 来复制数据包并将它们转发到不同的服务器,服务器 B,就像这样。
iptables -t mangle -A PREROUTING -p udp --dport 9995 -j TEE --gateway <Server B ip address>
通过 tcpdump 检查,我可以看到服务器 B 正在接收重复的数据包,其中包含服务器 A 的 IP 地址。 输出如下,但出于安全原因,我已经编辑了 IP 地址。
tcpdump -i eno1 -n dst port 9995
12:49:49.130772 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.131067 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133504 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133527 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133533 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1260
12:49:49.391871 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1452
12:49:49.391894 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1368
所以,我知道服务器 B 正在端口 9995 上接收数据包。 检查 netstat 也显示了这一点。
netstat -an | grep 9995
udp 0 0 0.0.0.0:9995 0.0.0.0:*
logstash.yml如下
node.name: server-b
path.data: /var/lib/logstash
http.host: "0.0.0.0"
modules:
- name: netflow
var.input.udp.port: 9995 # Inbound connections
var.elasticsearch.hosts: "<ip address>:9200"
var.kibana.host: "<ip address>:5601"
path.logs: /var/log/logstash
检查/var/log/logstash/logstash-plain.log,我看到的唯一警告是关于 elasticsearch 的版本大于版本 6,因此不会使用类型 确定文档类型。
[2018-07-06T12:58:13,771][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-06T12:58:13,817][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-06T12:58:17,599][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"module-netflow", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-06T12:58:17,733][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://<ip address>:9200/]}}
[2018-07-06T12:58:17,734][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://<ip address>:9200/, :path=>"/"}
[2018-07-06T12:58:17,784][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://<ip address>:9200/"}
[2018-07-06T12:58:17,810][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-07-06T12:58:17,810][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-07-06T12:58:17,811][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//<ip address>:9200"]}
[2018-07-06T12:58:18,088][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,101][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,102][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,120][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:9995"}
[2018-07-06T12:58:18,126][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"module-netflow", :thread=>"#<Thread:0x16700849@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[2018-07-06T12:58:18,131][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:9995", :receive_buffer_bytes=>"212992", :queue_size=>"2000"}
[2018-07-06T12:58:18,135][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["module-netflow"]}
ElasticSearch 正在 运行ning 并且正在从 packetbeat 和 filebeat 接收数据,/var/log/elasticsearch/elasticsearch.log 中没有任何内容表明 elasticsearch 有任何错误。 但是,elasticsearch 没有 netflow 的索引模式。另一方面,Kibana 可以。
因此,服务器 B 上的 logstash 正在侦听 0.0.0.0:9995,端口 9995 已打开并从服务器 A 接收数据包,但 logstash 无法识别这些数据包。 我的假设是服务器 B 忽略了它们,因为目标 IP 地址是服务器 A 的 IP 地址。这听起来对吗?如果是这样,有没有办法解决这个问题?
是否有更好的方法将重复的数据包从服务器 A 转发到服务器 B 并让 logstash 读取它们?
不幸的是,无法将另一个 netflow 导出器目标添加到路由器配置中。
我会回答我自己的问题。
服务器 B 确实忽略了 netflow 数据,因为它无法识别 IP 地址。 我添加了服务器 A 的 ip 地址作为环回接口,它按预期工作。
这可能不是最好的解决方案,也是在生产环境中应避免的解决方案,但出于测试目的,它应该没问题。