如何删除 Logstash 过滤器中所有具有 NULL 值的字段

How to remove all fields with NULL value in Logstash filter

我正在使用 logstash 读取 csv 格式的检查点日志文件 并且某些字段具有空值。

我想删除所有具有空值的字段。

我无法准确预见哪些字段(键)将具有空值,因为我在 csv 文件中有 150 列,我不想检查每一列。

是否可以在 logstash 中做一个动态过滤器来删除任何具有空值的字段?

我的 logstash 配置文件如下所示:

input {
  stdin { tags => "checkpoint" } 
   file {
   type => "file-input"
   path =>  "D:\Browser Downloads\logstash\logstash-1.4.2\bin\checkpoint.csv"
   sincedb_path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\sincedb-access2"
   start_position => "beginning"
   tags => ["checkpoint","offline"]
  }
}
filter {
 if "checkpoint" in [tags] {
        csv {
        columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
      #  remove_field => [ any fields with null value] how to do it please 
        separator => "|"
        }
    # drop csv header
        if [num] == "num" and [date] == "date" and [time] == "time" and [orig] == "orig" {
        drop { }
    }
    }
  }

}
output {
   stdout {
    codec => rubydebug 
  }
   file {
      path => "output.txt"
   }

这里我附上一些日志示例:

num|date|time|orig|type|action|alert|i/f_name|i/f_dir|product|Internal_CA:|serial_num:|dn:|sys_message:|inzone|outzone|rule|rule_uid|rule_name|service_id|src|dst|proto|service|s_port|dynamic object|change type|message_info|StormAgentName|StormAgentAction|TCP packet out of state|tcp_flags|xlatesrc|xlatedst|NAT_rulenum|NAT_addtnl_rulenum|xlatedport|xlatesport|fw_message|ICMP|ICMP Type|ICMP Code|DCE-RPC Interface UUID|rpc_prog|log_sys_message|scheme:|Validation log:|Reason:|Serial num:|Instruction:|fw_subproduct|vpn_feature_name|srckeyid|dstkeyid|user|methods:|peer gateway|IKE:|CookieI|CookieR|msgid|IKE notification:|Certificate DN:|IKE IDs:|partner|community|Session:|L2TP:|PPP:|MAC:|OM:|om_method:|assigned_IP:|machine:|reject_category|message:|VPN internal source IP|start_time|connection_uid|encryption failure:|vpn_user|Log ID|message|old IP|old port|new IP|new port|elapsed|connectivity_state|ctrl_category|description|description |severity|auth_status|identity_src|snid|src_user_name|endpoint_ip|src_machine_name|src_user_group|src_machine_group|auth_method|identity_type|Authentication trial|roles|dst_user_name|dst_machine_name|spi|encryption fail reason:|information|error_description|domain_name|termination_reason|duration
0|8Jun2012|16:33:35|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 1|8Jun2012|16:36:34|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 3|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Initiated certificate is now valid|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
 4|8Jun2012|16:55:44|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Issued empty CRL 1|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
20|8Jun2012|16:58:28|10.0.0.1|log|accept||eth1|inbound|VPN-1 & FireWall-1|||||Internal|External|1|{2A42C8CD-148D-4809-A480-3171108AD6C7}||domain-udp|192.168.100.1|198.32.64.12|udp|53|1036|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

要动态执行此操作,您需要使用 ruby{} 过滤器。有一些很好的示例代码 in this answer.

Ruby 过滤器可以满足您的要求。

input {
        stdin {
        }
}

filter {
        csv {
                columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
                separator => "|"
        }
        ruby {
                code => "
                        hash = event.to_hash
                        hash.each do |k,v|
                                if v == nil
                                        event.remove(k)
                                end
                        end
                "
        }
}

output {
    stdout { codec => rubydebug }
}

您可以使用 ruby 插件过滤所有具有 nil 值的字段(Ruby 中为空)

更新:

这是我的环境:Windows server 2008 和 Logstash 1.4.1。 你的日志样本对我有用!我已经更新了配置、输入和输出。

输入

2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

输出:

{
        "@version" => "1",
      "@timestamp" => "2015-03-12T00:30:34.123Z",
            "host" => "BENLIM",
             "num" => "2",
            "date" => "8Jun2012",
            "time" => "16:52:39",
            "orig" => "10.0.0.1",
            "type" => "log",
          "action" => "keyinst",
        "i/f_name" => "daemon",
         "i/f_dir" => "inbound",
         "product" => "VPN-1 & FireWall-1",
    "Internal_CA:" => "Certificate initialized",
     "serial_num:" => "86232",
             "dn:" => "CN=fw-KO,O=sc-KO.KO.dc.obn8cx"
}

检查 skip_empty_columns option of the csv 过滤器 - 对我的用例很有帮助。 :)

用法:

skip_empty_columns => true

如果您需要递归地删除所有 null、空白和空字段(0false 保留),此函数可能会有所帮助。它使用 Logstash 中的 Ruby 过滤器。它绝不是优雅的,但似乎非常有效。

filter {
    ruby {
        init => "
            def Compact(key)
                modifiedKey = nil
                parentKey = nil

                if key.kind_of?(String)
                    if key.start_with?('[')
                        modifiedKey = key
                    else
                        modifiedKey = key.sub( /([^\[^\]]*)/, '[]')
                    end

                parentKey = modifiedKey.sub(/\[[^\[]+\]$/, '') unless modifiedKey.sub(/\[[^\[]+\]$/, '').strip.empty?
                end

                unless modifiedKey.nil?
                    if event.get(modifiedKey).is_a?(Enumerable) &&
                    (event.get(modifiedKey).nil? || event.get(modifiedKey).empty?)
                         event.remove(modifiedKey)
                    elsif event.get(modifiedKey).to_s.strip.empty? || event.get(modifiedKey).nil?
                         event.remove(modifiedKey)
                     end

                    if !parentKey.nil? && event.get(parentKey).is_a?(Enumerable) &&
                    (event.get(parentKey).nil? || event.get(parentKey).empty?)
                        event.remove(parentKey)
                    end
                end

               if key == event.to_hash ||
               event.get((modifiedKey ? modifiedKey : '')).is_a?(Enumerable)
                   key = event.get(modifiedKey) unless modifiedKey.nil?
                   key.each{ |k|
                      Compact(%{#{modifiedKey ? modifiedKey : ''}[#{k.first}]}) if k.is_a?(Enumerable)
                   }
               end

               rescue Exception => e
                   puts %{ruby_exception_#{__method__.to_s} - #{e}}
           end
      "

     code => "
         Compact(event.to_hash)
     "
    }
}
ruby {
            init => "
                def removeEmptyField(event,h,name)
                    h.each do |k,v|
                            if (v.is_a?(Hash) || v.is_a?(Array)) && v.to_s != '{}'
                                removeEmptyField(event,v,String.new(name.to_s) << '[' << k.to_s << ']')
                            else
                            if v == '' || v.to_s == '{}'
                                event.remove(String.new(name.to_s) << '[' << k.to_s << ']')
                            end
                        end
                    end
                end
            "
            code => "
                removeEmptyField event,event.to_hash,''
            "
    }