将数据从 Filebeats 发送到多个 Logstash 文件时遇到问题

Facing Issue while sending data from Filebeats to Multiple Logstash files

准确地说,我正在处理一个包含近百万条记录的日志文件。由于它是账单摘要日志,因此客户信息将不按特定顺序记录。
我正在使用 customized GROK Patternslogstash XML filter plugin 来提取足以跟踪的数据。为了跟踪个人客户活动,我使用 "Customer_ID" 作为唯一键。因此,即使我使用多个 Logstash 文件和多个 GROK 模式,他的所有信息也可能是 bounded/Aggregated 使用他的 "Customer_ID"(唯一键)

这是我的日志文件示例,
7-04-2017 08:49:41 INFO abcinfo (ABC_RemoteONUS_Processor.java52) - Customer_Entry :::<?xml version="1.0" encoding="UTF-8"?><ns2:ReqListAccount xmlns:ns2="http://vcb.org/abc/schema/"/"><Head msgId="1ABCDEFegAQtQOSuJTEs3u" orgId="ABC" ts="2017-04-27T08:49:51+05:30" ver="1.0"/><Cust id="ABCDVFR233cd662a74a229002159220ce762c" note="Account CUST Listing" refId="DCVD849512576821682" refUrl="http://www.ABC.org.in/" ts="2017-04-27T08:49:51+05:30"

我的 Grok 模式,

grok {
patterns_dir => "D:\elk\logstash-5.2.1\vendor\bundle\jruby.9\gems\logstash-patterns-core-4.0.2\patterns"
match => [ "message" , "%{DATESTAMP:datestamp} %{LOGLEVEL:Logseverity}\s+%{WORD:ModuleInfo} \(%{NOTSPACE:JavaClass}\)%{ABC:Customer_Init}%{GREEDYDATA:Cust}"]add_field => { "Details" => "Request" }remove_tag => ["_grokparsefailure"]}  

我的自定义图案存储在 Pattern_dir、

ABC ( - Customer_Entry :::)

我的 XML 过滤器插件,

xml {
source => "Cust"
store_xml =>false
xpath => [
  "//Head/@ts", "Cust_Req_time",
  "//Cust/@id", "Customer_ID",
  "//Cust/@note", "Cust_note", ]
  }  

所以不管后面的细节是什么** - Customer_Entry :::**,我将能够使用 XML 插件过滤器提取它(将类似于多行编解码器存储).我已经编写了 5 个不同的 Logstash 文件来提取具有 5 个不同 Grok 模式的客户的不同活动。哪个会告诉,

1.Customer_Entry
2.Customer_Purchase
3.Customer_Last_Purchase
4.Customer_Transaction
5.Customer_Authorization

以上所有 Grok 模式都有不同的信息集,如我之前所说,这些信息将按 Customer_ID 分组。

通过将我的自定义模式与不同的日志文件一起使用,我可以在 Kibana 中毫无瑕疵地提取信息并将其清晰地可视化。

因为我每天都有 100 个日志文件要放入 logstash,所以我选择了 Filebeats,但 Filebeats 运行 只有一个端口“5044”。我尝试 运行 为 5 个不同的 logstash 文件使用 5 个不同的端口,但这不起作用,只有 5 个中的一个 logstash 文件被加载,其余配置文件处于空闲状态。
这是我的样本 filebeat output.prospector,

output.logstash:
主持人:["localhost:5044"]

output.logstash:
主持人:["localhost:5045"]

output.logstash:
主持人:["localhost:5046"]

我无法在一个 logstash 配置文件中添加所有 grok 模式,因为 XML 过滤器插件采用源 "GREEDYDATA"。在这种情况下,我将为 5 种不同的 Grok 模式设置 5 种不同的 Source=>。 我什至也尝试过,但那是行不通的。

寻找更好的方法。

听起来您正在寻找具有并行摄取的规模。碰巧的是,File beats 支持一种叫做 load-balancing 的东西,这听起来像你正在寻找的东西。

output.logstash:
  hosts: [ "localhost:5044", "localhost:5045", "localhost:5046" ]
  loadbalance: true

那是为了输出。不过,我相信您希望对输入进行多线程处理。 FileBeats 应该跟踪探矿者配置中指定的所有文件,但您发现了限制。通配符或指定目录将单线程处理 glob/directory 中的文件。如果您的文件名支持它,通过在同一目录中定义多个 glob,creative-globbing 可能会让您获得更好的并行性。

假设您的日志按类型进入:

- input_type: log
  paths:
    - /mnt/billing/*entry.log
    - /mnt/billing/*purchase.log
    - /mnt/billing/*transaction.log

将在此处启用多线程读取并行文件的探矿者。

如果您的日志使用随机名称,您可以使用类似的设置

- input_type: log
  paths:
    - /mnt/billing/a*
    - /mnt/billing/b*
    - /mnt/billing/c*
    [...]
    - /mnt/billing/z*

如果您正在处理大量具有从不重复的唯一名称的文件,将 clean_inactive 配置配置选项添加到您的探矿者将使您的 FileBeat 运行 快速。

- input_type: log
  ignore_older: 18h
  clean_inactive: 24h
  paths:
    - /mnt/billing/a*
    - /mnt/billing/b*
    - /mnt/billing/c*
    [...]
    - /mnt/billing/z*

这将删除超过 24 小时的文件的所有状态,并且不会处理任何超过 18 小时的文件。