如何通过 logstash shipper 只发送错误日志

How to send only error logs via logstash shipper

我正在使用 Logstash 将 JSON 消息输出到 API。我正在从日志文件中读取日志。我的配置工作正常,它还将所有消息发送到 API。 以下是示例日志文件:

日志文件:

    TID: [-1234] [] [2016-06-07 12:52:59,862]  INFO {org.apache.synapse.core.axis2.ProxyService} -  Successfully created the Axis2 service for Proxy service : TestServiceHttp {org.apache.synapse.core.axis2.ProxyService}
    TID: [-1234] [] [2016-06-07 12:59:04,893]  INFO {org.apache.synapse.mediators.builtin.LogMediator} -  To: /services/TestServiceHttp.TestServiceHttpHttpSoap12Endpoint********* Sending Message to the Queue*****WSAction: urn:mediate********* Sending Message to the Queue*****SOAPAction: urn:mediate********* Sending Message to the Queue*****MessageID: urn:uuid:d1bbe24a-2ce3-497f-8224-d260b0632506********* Sending Message to the Queue*****Direction: request********* Sending Message to the Queue*****Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><name> Omer</name></soapenv:Body></soapenv:Envelope> {org.apache.synapse.mediators.builtin.LogMediator}
    TID: [-1234] [] [2016-06-07 12:59:04,925]  INFO {org.apache.synapse.core.axis2.TimeoutHandler} -  This engine will expire all callbacks after : 120 seconds, irrespective of the timeout action, after the specified or optional timeout {org.apache.synapse.core.axis2.TimeoutHandler}
    TID: [-1234] [] [2016-06-07 12:59:04,933] ERROR {org.apache.axis2.description.ClientUtils} -  The system cannot infer the transport information from the jms:/Customer.01.Request.Queue.01?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue URL. {org.apache.axis2.description.ClientUtils}
    TID: [-1234] [] [2016-06-07 12:59:04,949] ERROR {org.apache.synapse.core.axis2.Axis2Sender} -  Unexpected error during sending message out {org.apache.synapse.core.axis2.Axis2Sender}
    org.apache.axis2.AxisFault: The system cannot infer the transport information from the jms:/Customer.01.Request.Queue.01?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue URL.
        at org.apache.axis2.description.ClientUtils.inferOutTransport(ClientUtils.java:81)
        at org.apache.axis2.client.OperationClient.prepareMessageContext(OperationClient.java:288)
        at org.apache.axis2.transport.base.threads.NativeWorkerPool.run(NativeWorkerPool.java:172)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    TID: [-1234] [] [2016-06-07 12:59:05,009]  INFO {org.apache.synapse.mediators.builtin.LogMediator} -  To: /services/TestServiceHttp.TestServiceHttpHttpSoap12Endpoint, WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: urn:uuid:d1bbe24a-2ce3-497f-8224-d260b0632506, Direction: request, MESSAGE = Executing default 'fault' sequence, ERROR_CODE = 0, ERROR_MESSAGE = Unexpected error during sending message out, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope"><soapenv:Body><name> Omer</name></soapenv:Body></soapenv:Envelope> {org.apache.synapse.mediators.builtin.LogMediator}
    TID: [-1234] [] [2016-06-07 13:00:04,890]  INFO {org.apache.axis2.transport.http.HTTPSender} -  Unable to sendViaPost to url[http://Omer-PC:8280/services/TestServiceHttp.TestServiceHttpHttpSoap12Endpoint] {org.apache.axis2.transport.http.HTTPSender}
    java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:170)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
        at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
ent.ServiceClient.sendReceive(ServiceClient.java:530)
        at org.apache.jsp.admin.jsp.WSRequestXSSproxy_005fajaxprocessor_jsp._jspService(WSRequestXSSproxy_005fajaxprocessor_jsp.java:294)
        at org.apache.jasper.runtime.HttpJspBase.service(HttpJspBase.java:70)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
        at org.apache.jasper.servlet.JspServletWrapper.service(JspServletWrapper.java:432)
        at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:395)
        at org.apache.jasper.servlet.JspServlet.service(JspServlet.java:339)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
        at org.wso2.carbon.ui.JspServlet.service(JspServlet.java:155)
        at org.wso2.carbon.ui.TilesJspServlet.service(TilesJspServlet.java:80)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
        at org.eclipse.equinox.http.helper.ContextPathServletAdaptor.service(ContextPathServletAdaptor.java:37)
        at org.eclipse.equinox.http.servlet.internal.ServletRegistration.service(ServletRegistration.java:61)
        at org.eclipse.equinox.http.servlet.internal.ProxyServlet.processAlias(ProxyServlet.java:128)
        at org.eclipse.equinox.http.servlet.internal.ProxyServlet.service(ProxyServlet.java:68)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
        at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.service(DelegationServlet.java:68)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
        at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
        at org.wso2.carbon.tomcat.ext.valves.CarbonContextCreatorValve.invoke(CarbonContextCreatorValve.java:57)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:421)
        at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1074)
        at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:611)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1739)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1698)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
        at java.lang.Thread.run(Thread.java:745)
    TID: [-1234] [] [2016-06-07 13:01:40,447]  INFO {org.wso2.carbon.core.init.CarbonServerManager} -  Shutdown hook triggered.... {org.wso2.carbon.core.init.CarbonServerManager}
    TID: [-1234] [] [2016-06-07 13:01:40,464]  INFO {org.wso2.carbon.core.init.CarbonServerManager} -  Gracefully shutting down WSO2 Enterprise Service Bus... {org.wso2.carbon.core.init.CarbonServerManager}
    TID: [-1234] [] [2016-06-07 13:01:40,477]  INFO {org.wso2.carbon.core.ServerManagement} -  Starting to switch to maintenance mode... {org.wso2.carbon.core.ServerManagement}
    TID: [-1234] [] [2016-06-07 13:01:40,481]  INFO {org.apache.axis2.transport.jms.JMSListener} -  JMS Listener Shutdown {org.apache.axis2.transport.jms.JMSListener}

以下是我的配置文件:

配置文件:

input {
 stdin {}

    file {
        path => "C:\WSO2Environment\wso2esb-4.9.0\repository\logs\wso2carbon.log" 
        type => "wso2"
        start_position => "beginning"
         codec => multiline {
            pattern => "(^\s*at .+)|^(?!TID).*$"
            negate => false
            what => "previous"
    }

    }
}

filter {

    if [type] == "wso2" {
        grok {
            match => [ "message", "TID:%{SPACE}\[%{INT:SourceSystemId}\]%{SPACE}\[%{DATA:ProcessName}\]%{SPACE}\[%{TIMESTAMP_ISO8601:TimeStamp}\]%{SPACE}%{LOGLEVEL:MessageType}%{SPACE}{%{JAVACLASS:MessageTitle}}%{SPACE}-%{SPACE}%{GREEDYDATA:Message}" ]
            add_tag => [ "grokked" ]        
        }
        mutate {
          gsub => [
             "TimeStamp", "\s", "T",
             "TimeStamp", ",", "."
           ]
        }


    }
    if !( "_grokparsefailure" in [tags] ) {
        grok{
              match => [ "message", "%{GREEDYDATA:StackTrace}" ]
              add_tag => [ "grokked" ]  
        }
        date {
                match => [ "timestamp", "yyyy MMM dd HH:mm:ss:SSS" ]
                target => "TimeStamp"
                timezone => "UTC"
            }
    }

    if ( "multiline" in [tags] ) {
        grok {
            match => [ "message", "%{GREEDYDATA:StackTrace}" ]
            add_tag => [ "multiline" ]
            tag_on_failure => [ "multiline" ]       
        }
        date {
                match => [ "timestamp", "yyyy MMM dd HH:mm:ss:SSS" ]
                target => "TimeStamp"

            }
    }

}

output {
    stdout { }
     http {
        url => "http://localhost:8086/messages"
        http_method => "post"
        format => "json"
        mapping => ["TimeStamp","%{TimeStamp}","MessageType","%{MessageType}","MessageTitle","%{MessageTitle}","Message","%{log_EventMessage}","SourceSystemId","%{SourceSystemId}","StackTrace","%{log_StackTrace}"]

    }
}

问题陈述:

配置文件工作正常并将所有日志条目发送到 API,但我只想将错误日志发送到 API。因此,我想对 "MessageType" 进行检查,其中我得到的日志级别如果其值为 "ERROR" 只有这样它才应该将消息发送到 API 否则 logstash 应该丢弃该消息。

在过滤器部分的 logstash 配置中,您可以根据 if 条件使用添加标签。并在输出中添加 if 语句来检查标签错误是否存在,它将发送,否则将忽略。

在以下 if 语句之后:

if [type] == "wso2" {
    grok {
        match => [ "message", "TID:%{SPACE}\[%{INT:SourceSystemId}\]%{SPACE}\[%{DATA:ProcessName}\]%{SPACE}\[%{TIMESTAMP_ISO8601:TimeStamp}\]%{SPACE}%{LOGLEVEL:MessageType}%{SPACE}{%{JAVACLASS:MessageTitle}}%{SPACE}-%{SPACE}%{GREEDYDATA:Message}" ]
        add_tag => [ "grokked" ]        
    }
    mutate {
      gsub => [
         "TimeStamp", "\s", "T",
         "TimeStamp", ",", "."
       ]
    }


}

在您的过滤器中添加以下语句:

    if "grokked" in [tags] {
        grok {
             match => ["MessageType", "ERROR"]
             add_tag => [ "loglevelerror" ]
        }   
    }

然后在您的输出中进行以下更改:

output {

    if "loglevelerror" in [tags] {
        stdout { }
         http {
        url => "http://localhost:8086/messages"
        http_method => "post"
        format => "json"
        mapping => ["TimeStamp","%{TimeStamp}","MessageType","%{MessageType}","MessageTitle","%{MessageTitle}","Message","%{log_EventMessage}","SourceSystemId","%{SourceSystemId}","StackTrace","%{log_StackTrace}"]

        }
    }
}

我在我的机器上使用 stdout 测试了它。它工作正常。希望对您有所帮助!