Logstash 从不同类型的消息中提取数据

Logstash extract data from different types of messages

以下是我从我们的自动化平台获得的日志类型的 3 个示例。我正在寻找提取 customOptions 部分。我 运行 面临的挑战是自定义选项部分可能有很多。我认为我需要做的是拆分自定义选项数组,然后对其进行剖析。我尝试过 logstash dissect、grok 和 mutate 并努力获取数据。

2020-12-09_18:06:30.58027 executing local task [refId:3122, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3122, jobTemplateId:3122, jobDate:1607537190133, userId:1897, customConfig:{"AnsibleRequestedUser":"testing1","AnsibleRequestedUserPassword":"VMware321!"}, jobTemplateExecutionId:5677, customInputs:[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]], processConfig:[accountId:947, status:executing, username:user1, userId:1897, userDisplayName:user1 user1, refType:jobTemplate, refId:3122, timerCategory:TEST: 0.  Enterprise Create User, timerSubCategory:3122, description: Enterprise Create User], processMap:[success:true, refType:jobTemplate, refId:3122, subType:null, subId:null, process: : 25172, timerCategory:TEST: 0. OpenManage Enterprise Create User, timerSubCategory:3122, zoneId:null, processId:25172], taskConfig:[:],:@45eb737f]



2020-12-09_15:33:43.21913 executing local task [refId:3117, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3117, jobTemplateId:3117, jobDate:1607528023018, userId:320, customConfig:null, jobTemplateExecutionId:5667, customInputs:[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training, AnsiblePoolName:asdf123]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3117, timerCategory:TEST: 2.  Enterprise - Create Identity Pool, timerSubCategory:3117, description:TEST: 2. Enterprise - Create Identity Pool], processMap:[success:true, refType:jobTemplate, refId:3117, subType:null, subId:null, process: : 25147, timerCategory:TEST: 2. Enterprise - Create Identity Pool, timerSubCategory:3117, zoneId:null, processId:25147], taskConfig:[:], :@21ff5f47]



2020-12-09_15:30:53.83030 executing local task [refId:3112, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3112, jobTemplateId:3112, jobDate:1607527853230, userId:320, customConfig:null, jobTemplateExecutionId:5662, customInputs:[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3112, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, description:TEST: 1. Enterprise - Create Template From Reference Device], processMap:[success:true, refType:jobTemplate, refId:3112, subType:null, subId:null, process: : 25142, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, zoneId:null, processId:25142], taskConfig:[:],:@29ac1e41]

数据需要从上面的消息中取下面的

消息 1:

[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]] I would like those to be in a new field. username:user1 need to have that in a field. timerCategory:TEST: 0. Enterprise Create User need to have this in a field.

其余的数据可以留在原来的字段消息中。

消息 2:

[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training, AnsiblePoolName:asdf123]] - I need these separated into different fields. username:user@company.com needs to be a field. timerCategory:TEST: 2. Enterprise - Create Identity Pool, - I need in a field.

消息 3:

[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], - I need these separated into separate fields. username:user@company.com

  • needs to be a field. timerCategory:TEST: 1. Enterprise - Create Template From Reference Device - needs to be a field.

现在请记住,计时器类别会根据日志输出的内容不断变化,但应保持与上面相同的格式。
自定义选项将不断变化——这意味着取决于启动的自动化将决定更多的自定义选项,但同样上面的格式应该保持不变。 用户名可以是电子邮件或通用名称。

以下是我尝试过的一些日志存储过滤器并取得了一些成功,但无法处理日志消息不断变化的性质。

# Testing a new method to get information from the logs. 
#if "executing local task" in [message] and "beats" in [tags]{
#   dissect {
#       mapping => {
#           "message" => "%{date} %{?skip1} %{?skip2} %{?skip3} %{?refid} %{?lockTimeout} %{?lockTtl} %{?jobtemplate} %{?jobType} %{?jobTemplateId} %{?jobDate} %{?userId} %{?jobTemplateExecutionId} %{?jobTemplateExecutionId1} customInputs:[customOptions:[%{?RequestedPassword}:%{?RequestedPassword} %{?TrackingUseCase1}:%{TrackingUseCase}, %{?RequestedUser}, %{?processConfig}, %{?status}, username:%{username}, %{?userId}, %{?userDisplayName}, %{?refType}, %{?refID}, %{?timerCategory}:%{TaskName}, %{?timeCat}, %{?description}, %{?extra}"
#       }
#   }
#}
# Testing Grok Filters instead.  
if "executing local task" in [messages] and "beats" in [tags]{
    grok {
        match => { "message" => "%{YEAR:year}-%{MONTHNUM2:month}-%{MONTHDAY:day}_%{TIME:time}%{SPACE}%{CISCO_REASON}%{SYSLOG5424PRINTASCII}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{SYSLOGPROG}%{SYSLOG5424SD:testing3}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing2}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing}%{GREEDYDATA}}"
        }
    }   
}

我认为grok是我需要用到的,但不熟悉如何拆分/添加字段来满足上述需求。

如有任何帮助,我们将不胜感激。

我建议不要尝试在单个过滤器中执行所有操作,尤其是单个 grok 模式。我将从使用 dissect 剥离时间戳开始。我将它保存在 [@metadata] 字段中,以便它可以在 logstash 管道中访问,但不会被输出处理。

    dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{} [%{[@metadata][restOfline]}" } }
    date { match => [ "[@metadata][timestamp]", "YYYY-MM-dd_HH:mm:ss.SSSSS" ] }

接下来我将使用 grok 模式分解 restOfLine。如果您只需要来自 processConfig 的字段,那么这是您唯一需要的 grok 模式。我以其他人为例,说明如何从一条消息中提取多种模式。

    grok {
        break_on_match => false
        match => {
            "[@metadata][restOfline]" => [
                "customOptions:\[(?<[@metadata][customOptions]>[^\]]+)",
                "processConfig:\[(?<[@metadata][processConfig]>[^\]]+)",
                "processMap:\[(?<[@metadata][processMap]>[^\]]+)"
            ]
        }
    }

现在我们可以解析 [@metadata][processConfig],这是一个 key/value 字符串。我们再次将解析后的值保存在 [@metadata] 中,然后只复制我们想要的值。

    kv {
        source => "[@metadata][processConfig]"
        target => "[@metadata][processConfigValues]"
        field_split_pattern => ", "
        value_split => ":"
        add_field => {
            "username" => "%{[@metadata][processConfigValues][username]}"
            "timeCategory" => "%{[@metadata][processConfigValues][timerCategory]}"
         }
    }

这会导致事件的字段如

    "username" => "user@company.com",
"timeCategory" => "TEST: 2.  Enterprise - Create Identity Pool"

这是另一个关于 grok 的回应(但我同意当时维护起来有点困难,而且现在也很难理解)。

  1. 使用正确的(有点长)grok 表达式提取字段 customOptions
  2. 仅使用另一个过滤器(键值)处理此特定字段,并放入 customOptionsSplitter 字段(以避免破坏现有字段)。

此代码是此代码的实现:

filter{

    grok {
        match => { "message" => "%{DATE:date}_%{TIME:time} %{CISCO_REASON} \[refId\:%{INT:refId}, lockTimeout:%{INT:lockTimeout}, lockTtl:%{INT:lockTtl}, jobType:%{NOTSPACE:jobType}, lockId:%{NOTSPACE:lockId}, jobTemplateId:%{INT:jobTemplateId}, jobDate:%{INT:jobDate}, userId:%{INT:userId}, customConfig:(\{%{GREEDYDATA:customConfig}\}|null), jobTemplateExecutionId:%{INT:jobTemplateExecutionId}, customInputs:\[customOptions:\[%{GREEDYDATA:customOptions}\]\], processConfig:\[%{GREEDYDATA:processConfig}\], processMap:\[%{GREEDYDATA:processMap}\], taskConfig:\[%{GREEDYDATA:taskConfig}\], :%{NOTSPACE:serial}\]"
        }
    }

    kv {
        source => "customOptions"
        target => "customOptionsSplitter"
        field_split_pattern => ", "
        value_split => ":"
    }

}