LogStash message misconfiguration error: Failed to execute action

LogStash message misconfiguration error: Failed to execute action

我正在使用 Logstash 和 Kv 插件来解析和识别由 Fortigate UTM 设备生成的日志文件的字段,但是我无法让它工作,

编辑 - 我已经让配置工作了,下面的代码适用于 Fortigate OS 日志

#Begin Input
input {
  udp  {
    type => "syslogrrr"
    port => 514
 }
}
#End Input

#Begin Filter
filter {
#Begin If Statement
  if [type] == "syslogrrr" {
#Begin Grok
grok {
   match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
     }
#End Grok

#Begin KV Plugin
kv {
    source => "syslog_message"
    value_split => "="
   }
#End KV Plugin

#Begin Mutate
mutate {
  convert => {
    "sentbyte" => "integer"
    "craction" => "integer"
    "crscore" => "integer"
    "dstport" => "integer"
    "duration" => "integer"
    "eventtime" => "integer"
    "logid" => "integer"
    "policyid" => "integer"
    "proto" => "integer"
    "rcvdbyte" => "integer"
    "rcvdpkt" => "integer"
    "sentpkt" => "integer"
    "sessionid" => "integer"
    "srcport" => "integer"
        "transport" => "integer"
            }
   remove_field => [ "message","syslog_message","path","@version","_id","_index","_score","_type" ]
   add_field => ["logTimestamp", "%{date} %{time}"]
        }
#End Mutate

#Begin Date
date {
      locale => "en"
      match => ["logTimestamp", "YYYY-MM-dd HH:mm:ss"]
      remove_field => ["logTimestamp", "year", "month", "day", "time", "date"]
      timezone=> "America/Guyana"
    }
#End Date

  }
#End If Statement
}
#End Filter

#Begin Output
output {
  elasticsearch { hosts => ["localhost:9200"] 
  index => "logstash-%{+yyyy.MM.dd}-001"

  }
 }
#End Output

希望这对正在寻找的人有所帮助

filter {
kv {
      source => "message"
        exclude_keys => [ "type", "subtype" ] }
        geoip { source => "dst" }
        geoip { source => "dstip" }
        geoip { source => "src" }
        geoip { source => "srcip" }

        mutate {

            rename => [ "dst", "dst_ip" ]
            rename => [ "dstip", "dst_ip" ]
            rename => [ "dstport", "dst_port" ]
            rename => [ "devname", "device_id" ]
            rename => [ "status", "action" ]
            rename => [ "src", "src_ip" ]
            rename => [ "srcip", "src_ip" ]
            rename => [ "zone", "src_intf" ]
            rename => [ "srcintf", "src_intf" ]
            rename => [ "srcport", "src_port" ]
            rename => [ "rcvd", "byte_recieved" ]
            rename => [ "rcvdbyte", "bytes_recieved" ]
            rename => [ "sentbyte", "bytes_sent" ]
            rename => [ "sent", "bytes_sent" ]
            convert => ["bytes_recieved", "integer"]
            convert => ["bytes_sent", "integer"]
            remove_field => [ "msg" ]
    }
  }

这个过滤器工作正常。我认为您添加了 3 个额外的右括号。

#Begin Input
input {
  udp  {
    type => "syslogrrr"
    port => 514
 }
}
#End Input

#Begin Filter
filter {
#Begin If Statement
  if [type] == "syslogrrr" {
#Begin Grok
grok {
   match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
     }
#End Grok

#Begin KV Plugin
kv {
    source => "syslog_message"
    value_split => "="
   }
#End KV Plugin

#Begin Mutate
mutate {
  convert => {
    "sentbyte" => "integer"
    "craction" => "integer"
    "crscore" => "integer"
    "dstport" => "integer"
    "duration" => "integer"
    "eventtime" => "integer"
    "logid" => "integer"
    "policyid" => "integer"
    "proto" => "integer"
    "rcvdbyte" => "integer"
    "rcvdpkt" => "integer"
    "sentpkt" => "integer"
    "sessionid" => "integer"
    "srcport" => "integer"
        "transport" => "integer"
            }
   remove_field => [ "message","syslog_message","path","@version","_id","_index","_score","_type" ]
   add_field => ["logTimestamp", "%{date} %{time}"]
        }
#End Mutate

#Begin Date
date {
      locale => "en"
      match => ["logTimestamp", "YYYY-MM-dd HH:mm:ss"]
      remove_field => ["logTimestamp", "year", "month", "day", "time", "date"]
      timezone=> "America/Guyana"
    }
#End Date

  }
#End If Statement
}
#End Filter

#Begin Output
output {
  elasticsearch { hosts => ["localhost:9200"] 
  index => "logstash-%{+yyyy.MM.dd}-001"

  }
 }
#End Output