使用 Logstash 从 Filebeat 解析 XML 数据

Parsing XML data from Filebeat using Logstash

我正在使用 Filebeat 解析 Windows 中的 XML 个文件,并将它们发送到 Logstash 进行过滤并发送到 Elasticsearch。

Filebeat 作业完美运行,我将 XML 块放入 Logstash,但看起来我错误配置了 Logstash 过滤器以将 XML 块解析为单独的字段并将这些字段封装为 Elasticsearch 类型.

这是我的 XML 示例数据:

<H_Ticket>
 <IDH_Ticket>26</IDH_Ticket>
 <CodeBus>186</CodeBus>
 <CodeCh>5531</CodeCh>
 <CodeConv>5531</CodeConv>
 <Codeligne>12</Codeligne>
 <Date>20150915</Date>
 <Heur>1110</Heur>
 <NomFR1>SOUK AHAD</NomFR1>
 <NomFR2>KANTAOUI </NomFR2>
 <Prix>0.66</Prix>
 <IDTicket>26</IDTicket>
 <CodeRoute>107</CodeRoute>
 <origine>01</origine>
 <Distination>06</Distination>
 <Num>6</Num>
 <Ligne>107</Ligne>
 <requisition> </requisition>
 <voyage>0</voyage>
 <faveur> </faveur>
 </H_Ticket>
<H_Ticket>
 <IDH_Ticket>26</IDH_Ticket>
 <CodeBus>186</CodeBus>
 <CodeCh>5531</CodeCh>
 <CodeConv>5531</CodeConv>
 <Codeligne>12</Codeligne>
 <Date>20150915</Date>
 <Heur>1110</Heur>
 <NomFR1>SOUK AHAD</NomFR1>
 <NomFR2>KANTAOUI </NomFR2>
 <Prix>0.66</Prix>
 <IDTicket>26</IDTicket>
 <CodeRoute>107</CodeRoute>
 <origine>01</origine>
 <Distination>06</Distination>
 <Num>6</Num>
 <Ligne>107</Ligne>
 <requisition> </requisition>
 <voyage>0</voyage>
 <faveur> </faveur>
 </H_Ticket>>     <H_Ticket>
 <IDH_Ticket>26</IDH_Ticket>
 <CodeBus>186</CodeBus>
 <CodeCh>5531</CodeCh>
 <CodeConv>5531</CodeConv>
 <Codeligne>12</Codeligne>
 <Date>20150915</Date>
 <Heur>1110</Heur>
 <NomFR1>SOUK AHAD</NomFR1>
 <NomFR2>KANTAOUI </NomFR2>
 <Prix>0.66</Prix>
 <IDTicket>26</IDTicket>
 <CodeRoute>107</CodeRoute>
 <origine>01</origine>
 <Distination>06</Distination>
 <Num>6</Num>
 <Ligne>107</Ligne>
 <requisition> </requisition>
 <voyage>0</voyage>
 <faveur> </faveur>
 </H_Ticket>

这是我的 logstash 配置文件:

input {  
    beats {
    port => 5044
  }
}
filter 
{
    xml 
    {
        source => "ticket"
        xpath => 
        [
            "/ticket/IDH_Ticket/text()", "ticketId",
            "/ticket/CodeBus/text()", "codeBus",
            "/ticket/CodeCh/text()", "codeCh",
            "/ticket/CodeConv/text()", "codeConv",
            "/ticket/Codeligne/text()", "codeLigne",
            "/ticket/Date/text()", "date",
            "/ticket/Heur/text()", "heure",
            "/ticket/NomFR1/text()", "nomFR1",
            "/ticket/NomAR1/text()", "nomAR1",
            "/ticket/NomFR2/text()", "nomFR2",
            "/ticket/NomAR2/text()", "nomAR2",
            "/ticket/Prix/text()", "prix",
            "/ticket/IDTicket/text()", "idTicket",
            "/ticket/CodeRoute/text()", "codeRoute",
            "/ticket/origine/text()", "origine",
            "/ticket/Distination/text()", "destination",
            "/ticket/Num/text()", "num",
            "/ticket/Ligne/text()", "ligne",
            "/ticket/requisition/text()", "requisition",
            "/ticket/voyage/text()", "voyage",
            "/ticket/faveur/text()", "faveur"
        ]
        store_xml => true
        target => "doc"
    }
}

output 
{
    elasticsearch 
    { 
        hosts => "localhost"
        index => "buses"
        document_type => "ticket"
    }
    file {
    path => "C:\busesdata\logstash.log"
}
stdout { codec =>rubydebug}
}

Filebeat 配置:

filebeat:
  # List of prospectors to fetch data.
  prospectors:
      paths:
        - C:\busesdata\*.xml
      input_type: log
      document_type: ticket
      scan_frequency: 10s
      multiline:
        pattern: '<H_Ticket'
        negate: true
        match: after
output:
  ### Logstash as output
  logstash:
    hosts: ["localhost:5044"]
    index: filebeat

这里是标准输出和文件输出的一部分:

PS C:\logstash-2.3.3\bin> .\logstash -f .\logstash_temp.conf
io/console not supported; tty will not be manipulated
Settings: Default pipeline workers: 4
Pipeline main started

{
       "message" => "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\r\n<?xml-stylesheet href=\"ticket.xsl\" type=\"text/xsl\"?>\n<HF_DOCUMENT>",
      "@version" => "1",
    "@timestamp" => "2016-07-03T12:13:28.892Z",
        "source" => "C:\busesdata\ticket2.xml",
          "type" => "ticket",
    "input_type" => "log",
        "fields" => nil,
          "beat" => {
        "hostname" => "hp-pavillion-g6",
            "name" => "hp-pavillion-g6"
    },
        "offset" => 0,
         "count" => 1,
          "host" => "hp-pavillion-g6",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ]
}
{
       "message" => "\t<H_Ticket>\r\n\t\t<IDH_Ticket>1</IDH_Ticket>\r\n\t\t<CodeBus>186</CodeBus>\r\n\t\t<CodeCh>5531</CodeCh>\r\n\t\t<CodeConv>5531</CodeConv>\r\n\t\t<Codeligne>12</Codeligne>\r\n\t\t<Date>20150903</Date>\r\n\t\t<Heur>1101</Heur>\r\n\t\t<NomFR1>SOUK AHAD</NomFR1>\r\n\t\t<NomAR1>??? ?????</NomAR1>\r\n\t\t<NomFR2>SOVIVA </NomFR2>\r\n\t\t<NomAR2>??????</NomAR2>\r\n\t\t<Prix>0.66</Prix>\r\n\t\t<IDTicket>1</IDTicket>\r\n\t\t<CodeRoute>107</CodeRoute>\r\n\t\t<origine>01</origine>\r\n\t\t<Distination>07</Distination>\r\n\t\t<Num>3</Num>\r\n\t\t<Ligne>107</Ligne>\r\n\t\t<requisition> </requisition>\r\n\t\t<voyage>0</voyage>\r\n\t\t<faveur> </faveur>\r\n\t</H_Ticket>",
      "@version" => "1",
    "@timestamp" => "2016-07-03T12:13:28.892Z",
    "input_type" => "log",
        "source" => "C:\busesdata\ticket2.xml",
        "offset" => 125,
          "type" => "ticket",
         "count" => 1,
        "fields" => nil,
          "beat" => {
        "hostname" => "hp-pavillion-g6",
            "name" => "hp-pavillion-g6"
    },
          "host" => "hp-pavillion-g6",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ]
}

您能否尝试编辑 filter 中的 xpath 配置,如下所示:

filter 
{
    xml 
    {
        source => "ticket"
        xpath => 
        [
            "/IDH_Ticket/text()", "ticketId",
            "/CodeBus/text()", "codeBus",
            "/CodeCh/text()", "codeCh",
            "/CodeConv/text()", "codeConv",
            "/Codeligne/text()", "codeLigne",
            "/Date/text()", "date",
            "/Heur/text()", "heure",
            "/NomFR1/text()", "nomFR1",
            "/NomAR1/text()", "nomAR1",
            "/NomFR2/text()", "nomFR2",
            "/NomAR2/text()", "nomAR2",
            "/Prix/text()", "prix",
            "/IDTicket/text()", "idTicket",
            "/CodeRoute/text()", "codeRoute",
            "/origine/text()", "origine",
            "/Distination/text()", "destination",
            "/Num/text()", "num",
            "/Ligne/text()", "ligne",
            "/requisition/text()", "requisition",
            "/voyage/text()", "voyage",
            "/faveur/text()", "faveur"
        ]
        store_xml => true
        target => "doc"
    }
}

xml 过滤器将不起作用,因为源配置指向一个不存在的字段。
您的文档中没有字段 ticket

{
    "message" => "\t<H_Ticket>\r\n\t\t<IDH_Ticket>1</IDH_Ticket>\r\n\t\t<CodeBus>186</CodeBus>\r\n\t\t<CodeCh>5531</CodeCh>\r\n\t\t<CodeConv>5531</CodeConv>\r\n\t\t<Codeligne>12</Codeligne>\r\n\t\t<Date>20150903</Date>\r\n\t\t<Heur>1101</Heur>\r\n\t\t<NomFR1>SOUK AHAD</NomFR1>\r\n\t\t<NomAR1>??? ?????</NomAR1>\r\n\t\t<NomFR2>SOVIVA </NomFR2>\r\n\t\t<NomAR2>??????</NomAR2>\r\n\t\t<Prix>0.66</Prix>\r\n\t\t<IDTicket>1</IDTicket>\r\n\t\t<CodeRoute>107</CodeRoute>\r\n\t\t<origine>01</origine>\r\n\t\t<Distination>07</Distination>\r\n\t\t<Num>3</Num>\r\n\t\t<Ligne>107</Ligne>\r\n\t\t<requisition> </requisition>\r\n\t\t<voyage>0</voyage>\r\n\t\t<faveur> </faveur>\r\n\t</H_Ticket>",
    "@version" => "1",
    "@timestamp" => "2016-07-03T12:13:28.892Z",
    "input_type" => "log",
    "source" => "C:\busesdata\ticket2.xml",
    "offset" => 125,
    "type" => "ticket",
    "count" => 1,
    "fields" => nil,
    "beat" => {
        "hostname" => "hp-pavillion-g6",
        "name" => "hp-pavillion-g6"
    },
    "host" => "hp-pavillion-g6",
    "tags" => [
        [0] "beats_input_codec_plain_applied"
    ]
}

您应该将 xml 过滤器更改为:

 xml {
        source => "message"
        ...
 }