Logstash 从不同类型的消息中提取数据
Logstash extract data from different types of messages
以下是我从我们的自动化平台获得的日志类型的 3 个示例。我正在寻找提取 customOptions 部分。我 运行 面临的挑战是自定义选项部分可能有很多。我认为我需要做的是拆分自定义选项数组,然后对其进行剖析。我尝试过 logstash dissect、grok 和 mutate 并努力获取数据。
2020-12-09_18:06:30.58027 executing local task [refId:3122, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3122, jobTemplateId:3122, jobDate:1607537190133, userId:1897, customConfig:{"AnsibleRequestedUser":"testing1","AnsibleRequestedUserPassword":"VMware321!"}, jobTemplateExecutionId:5677, customInputs:[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]], processConfig:[accountId:947, status:executing, username:user1, userId:1897, userDisplayName:user1 user1, refType:jobTemplate, refId:3122, timerCategory:TEST: 0. Enterprise Create User, timerSubCategory:3122, description: Enterprise Create User], processMap:[success:true, refType:jobTemplate, refId:3122, subType:null, subId:null, process: : 25172, timerCategory:TEST: 0. OpenManage Enterprise Create User, timerSubCategory:3122, zoneId:null, processId:25172], taskConfig:[:],:@45eb737f]
2020-12-09_15:33:43.21913 executing local task [refId:3117, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3117, jobTemplateId:3117, jobDate:1607528023018, userId:320, customConfig:null, jobTemplateExecutionId:5667, customInputs:[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training, AnsiblePoolName:asdf123]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3117, timerCategory:TEST: 2. Enterprise - Create Identity Pool, timerSubCategory:3117, description:TEST: 2. Enterprise - Create Identity Pool], processMap:[success:true, refType:jobTemplate, refId:3117, subType:null, subId:null, process: : 25147, timerCategory:TEST: 2. Enterprise - Create Identity Pool, timerSubCategory:3117, zoneId:null, processId:25147], taskConfig:[:], :@21ff5f47]
2020-12-09_15:30:53.83030 executing local task [refId:3112, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3112, jobTemplateId:3112, jobDate:1607527853230, userId:320, customConfig:null, jobTemplateExecutionId:5662, customInputs:[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3112, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, description:TEST: 1. Enterprise - Create Template From Reference Device], processMap:[success:true, refType:jobTemplate, refId:3112, subType:null, subId:null, process: : 25142, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, zoneId:null, processId:25142], taskConfig:[:],:@29ac1e41]
数据需要从上面的消息中取下面的
消息 1:
[customOptions:[AnsibleRequestedUser:testing1,
AnsibleRequestedUserPassword:VMware321!]] I would like those to be in
a new field. username:user1 need to have that in a field.
timerCategory:TEST: 0. Enterprise Create User need to have this in a
field.
其余的数据可以留在原来的字段消息中。
消息 2:
[customOptions:[AnsibleIdentPoolDesc:asdf123,
AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training,
AnsiblePoolName:asdf123]] - I need these separated into different
fields. username:user@company.com needs to be a field.
timerCategory:TEST: 2. Enterprise - Create Identity Pool, - I need in
a field.
消息 3:
[customOptions:[ReferenceServer:10629,
ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal
Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], - I
need these separated into separate fields. username:user@company.com
- needs to be a field. timerCategory:TEST: 1. Enterprise - Create Template From Reference Device - needs to be a field.
现在请记住,计时器类别会根据日志输出的内容不断变化,但应保持与上面相同的格式。
自定义选项将不断变化——这意味着取决于启动的自动化将决定更多的自定义选项,但同样上面的格式应该保持不变。
用户名可以是电子邮件或通用名称。
以下是我尝试过的一些日志存储过滤器并取得了一些成功,但无法处理日志消息不断变化的性质。
# Testing a new method to get information from the logs.
#if "executing local task" in [message] and "beats" in [tags]{
# dissect {
# mapping => {
# "message" => "%{date} %{?skip1} %{?skip2} %{?skip3} %{?refid} %{?lockTimeout} %{?lockTtl} %{?jobtemplate} %{?jobType} %{?jobTemplateId} %{?jobDate} %{?userId} %{?jobTemplateExecutionId} %{?jobTemplateExecutionId1} customInputs:[customOptions:[%{?RequestedPassword}:%{?RequestedPassword} %{?TrackingUseCase1}:%{TrackingUseCase}, %{?RequestedUser}, %{?processConfig}, %{?status}, username:%{username}, %{?userId}, %{?userDisplayName}, %{?refType}, %{?refID}, %{?timerCategory}:%{TaskName}, %{?timeCat}, %{?description}, %{?extra}"
# }
# }
#}
# Testing Grok Filters instead.
if "executing local task" in [messages] and "beats" in [tags]{
grok {
match => { "message" => "%{YEAR:year}-%{MONTHNUM2:month}-%{MONTHDAY:day}_%{TIME:time}%{SPACE}%{CISCO_REASON}%{SYSLOG5424PRINTASCII}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{SYSLOGPROG}%{SYSLOG5424SD:testing3}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing2}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing}%{GREEDYDATA}}"
}
}
}
我认为grok是我需要用到的,但不熟悉如何拆分/添加字段来满足上述需求。
如有任何帮助,我们将不胜感激。
我建议不要尝试在单个过滤器中执行所有操作,尤其是单个 grok 模式。我将从使用 dissect 剥离时间戳开始。我将它保存在 [@metadata] 字段中,以便它可以在 logstash 管道中访问,但不会被输出处理。
dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{} [%{[@metadata][restOfline]}" } }
date { match => [ "[@metadata][timestamp]", "YYYY-MM-dd_HH:mm:ss.SSSSS" ] }
接下来我将使用 grok 模式分解 restOfLine。如果您只需要来自 processConfig 的字段,那么这是您唯一需要的 grok 模式。我以其他人为例,说明如何从一条消息中提取多种模式。
grok {
break_on_match => false
match => {
"[@metadata][restOfline]" => [
"customOptions:\[(?<[@metadata][customOptions]>[^\]]+)",
"processConfig:\[(?<[@metadata][processConfig]>[^\]]+)",
"processMap:\[(?<[@metadata][processMap]>[^\]]+)"
]
}
}
现在我们可以解析 [@metadata][processConfig],这是一个 key/value 字符串。我们再次将解析后的值保存在 [@metadata] 中,然后只复制我们想要的值。
kv {
source => "[@metadata][processConfig]"
target => "[@metadata][processConfigValues]"
field_split_pattern => ", "
value_split => ":"
add_field => {
"username" => "%{[@metadata][processConfigValues][username]}"
"timeCategory" => "%{[@metadata][processConfigValues][timerCategory]}"
}
}
这会导致事件的字段如
"username" => "user@company.com",
"timeCategory" => "TEST: 2. Enterprise - Create Identity Pool"
这是另一个关于 grok 的回应(但我同意当时维护起来有点困难,而且现在也很难理解)。
- 使用正确的(有点长)grok 表达式提取字段 customOptions
- 仅使用另一个过滤器(键值)处理此特定字段,并放入 customOptionsSplitter 字段(以避免破坏现有字段)。
此代码是此代码的实现:
filter{
grok {
match => { "message" => "%{DATE:date}_%{TIME:time} %{CISCO_REASON} \[refId\:%{INT:refId}, lockTimeout:%{INT:lockTimeout}, lockTtl:%{INT:lockTtl}, jobType:%{NOTSPACE:jobType}, lockId:%{NOTSPACE:lockId}, jobTemplateId:%{INT:jobTemplateId}, jobDate:%{INT:jobDate}, userId:%{INT:userId}, customConfig:(\{%{GREEDYDATA:customConfig}\}|null), jobTemplateExecutionId:%{INT:jobTemplateExecutionId}, customInputs:\[customOptions:\[%{GREEDYDATA:customOptions}\]\], processConfig:\[%{GREEDYDATA:processConfig}\], processMap:\[%{GREEDYDATA:processMap}\], taskConfig:\[%{GREEDYDATA:taskConfig}\], :%{NOTSPACE:serial}\]"
}
}
kv {
source => "customOptions"
target => "customOptionsSplitter"
field_split_pattern => ", "
value_split => ":"
}
}
以下是我从我们的自动化平台获得的日志类型的 3 个示例。我正在寻找提取 customOptions 部分。我 运行 面临的挑战是自定义选项部分可能有很多。我认为我需要做的是拆分自定义选项数组,然后对其进行剖析。我尝试过 logstash dissect、grok 和 mutate 并努力获取数据。
2020-12-09_18:06:30.58027 executing local task [refId:3122, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3122, jobTemplateId:3122, jobDate:1607537190133, userId:1897, customConfig:{"AnsibleRequestedUser":"testing1","AnsibleRequestedUserPassword":"VMware321!"}, jobTemplateExecutionId:5677, customInputs:[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]], processConfig:[accountId:947, status:executing, username:user1, userId:1897, userDisplayName:user1 user1, refType:jobTemplate, refId:3122, timerCategory:TEST: 0. Enterprise Create User, timerSubCategory:3122, description: Enterprise Create User], processMap:[success:true, refType:jobTemplate, refId:3122, subType:null, subId:null, process: : 25172, timerCategory:TEST: 0. OpenManage Enterprise Create User, timerSubCategory:3122, zoneId:null, processId:25172], taskConfig:[:],:@45eb737f]
2020-12-09_15:33:43.21913 executing local task [refId:3117, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3117, jobTemplateId:3117, jobDate:1607528023018, userId:320, customConfig:null, jobTemplateExecutionId:5667, customInputs:[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training, AnsiblePoolName:asdf123]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3117, timerCategory:TEST: 2. Enterprise - Create Identity Pool, timerSubCategory:3117, description:TEST: 2. Enterprise - Create Identity Pool], processMap:[success:true, refType:jobTemplate, refId:3117, subType:null, subId:null, process: : 25147, timerCategory:TEST: 2. Enterprise - Create Identity Pool, timerSubCategory:3117, zoneId:null, processId:25147], taskConfig:[:], :@21ff5f47]
2020-12-09_15:30:53.83030 executing local task [refId:3112, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3112, jobTemplateId:3112, jobDate:1607527853230, userId:320, customConfig:null, jobTemplateExecutionId:5662, customInputs:[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3112, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, description:TEST: 1. Enterprise - Create Template From Reference Device], processMap:[success:true, refType:jobTemplate, refId:3112, subType:null, subId:null, process: : 25142, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, zoneId:null, processId:25142], taskConfig:[:],:@29ac1e41]
数据需要从上面的消息中取下面的
消息 1:
[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]] I would like those to be in a new field. username:user1 need to have that in a field. timerCategory:TEST: 0. Enterprise Create User need to have this in a field.
其余的数据可以留在原来的字段消息中。
消息 2:
[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training, AnsiblePoolName:asdf123]] - I need these separated into different fields. username:user@company.com needs to be a field. timerCategory:TEST: 2. Enterprise - Create Identity Pool, - I need in a field.
消息 3:
[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], - I need these separated into separate fields. username:user@company.com
- needs to be a field. timerCategory:TEST: 1. Enterprise - Create Template From Reference Device - needs to be a field.
现在请记住,计时器类别会根据日志输出的内容不断变化,但应保持与上面相同的格式。
自定义选项将不断变化——这意味着取决于启动的自动化将决定更多的自定义选项,但同样上面的格式应该保持不变。
用户名可以是电子邮件或通用名称。
以下是我尝试过的一些日志存储过滤器并取得了一些成功,但无法处理日志消息不断变化的性质。
# Testing a new method to get information from the logs.
#if "executing local task" in [message] and "beats" in [tags]{
# dissect {
# mapping => {
# "message" => "%{date} %{?skip1} %{?skip2} %{?skip3} %{?refid} %{?lockTimeout} %{?lockTtl} %{?jobtemplate} %{?jobType} %{?jobTemplateId} %{?jobDate} %{?userId} %{?jobTemplateExecutionId} %{?jobTemplateExecutionId1} customInputs:[customOptions:[%{?RequestedPassword}:%{?RequestedPassword} %{?TrackingUseCase1}:%{TrackingUseCase}, %{?RequestedUser}, %{?processConfig}, %{?status}, username:%{username}, %{?userId}, %{?userDisplayName}, %{?refType}, %{?refID}, %{?timerCategory}:%{TaskName}, %{?timeCat}, %{?description}, %{?extra}"
# }
# }
#}
# Testing Grok Filters instead.
if "executing local task" in [messages] and "beats" in [tags]{
grok {
match => { "message" => "%{YEAR:year}-%{MONTHNUM2:month}-%{MONTHDAY:day}_%{TIME:time}%{SPACE}%{CISCO_REASON}%{SYSLOG5424PRINTASCII}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{SYSLOGPROG}%{SYSLOG5424SD:testing3}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing2}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing}%{GREEDYDATA}}"
}
}
}
我认为grok是我需要用到的,但不熟悉如何拆分/添加字段来满足上述需求。
如有任何帮助,我们将不胜感激。
我建议不要尝试在单个过滤器中执行所有操作,尤其是单个 grok 模式。我将从使用 dissect 剥离时间戳开始。我将它保存在 [@metadata] 字段中,以便它可以在 logstash 管道中访问,但不会被输出处理。
dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{} [%{[@metadata][restOfline]}" } }
date { match => [ "[@metadata][timestamp]", "YYYY-MM-dd_HH:mm:ss.SSSSS" ] }
接下来我将使用 grok 模式分解 restOfLine。如果您只需要来自 processConfig 的字段,那么这是您唯一需要的 grok 模式。我以其他人为例,说明如何从一条消息中提取多种模式。
grok {
break_on_match => false
match => {
"[@metadata][restOfline]" => [
"customOptions:\[(?<[@metadata][customOptions]>[^\]]+)",
"processConfig:\[(?<[@metadata][processConfig]>[^\]]+)",
"processMap:\[(?<[@metadata][processMap]>[^\]]+)"
]
}
}
现在我们可以解析 [@metadata][processConfig],这是一个 key/value 字符串。我们再次将解析后的值保存在 [@metadata] 中,然后只复制我们想要的值。
kv {
source => "[@metadata][processConfig]"
target => "[@metadata][processConfigValues]"
field_split_pattern => ", "
value_split => ":"
add_field => {
"username" => "%{[@metadata][processConfigValues][username]}"
"timeCategory" => "%{[@metadata][processConfigValues][timerCategory]}"
}
}
这会导致事件的字段如
"username" => "user@company.com",
"timeCategory" => "TEST: 2. Enterprise - Create Identity Pool"
这是另一个关于 grok 的回应(但我同意当时维护起来有点困难,而且现在也很难理解)。
- 使用正确的(有点长)grok 表达式提取字段 customOptions
- 仅使用另一个过滤器(键值)处理此特定字段,并放入 customOptionsSplitter 字段(以避免破坏现有字段)。
此代码是此代码的实现:
filter{
grok {
match => { "message" => "%{DATE:date}_%{TIME:time} %{CISCO_REASON} \[refId\:%{INT:refId}, lockTimeout:%{INT:lockTimeout}, lockTtl:%{INT:lockTtl}, jobType:%{NOTSPACE:jobType}, lockId:%{NOTSPACE:lockId}, jobTemplateId:%{INT:jobTemplateId}, jobDate:%{INT:jobDate}, userId:%{INT:userId}, customConfig:(\{%{GREEDYDATA:customConfig}\}|null), jobTemplateExecutionId:%{INT:jobTemplateExecutionId}, customInputs:\[customOptions:\[%{GREEDYDATA:customOptions}\]\], processConfig:\[%{GREEDYDATA:processConfig}\], processMap:\[%{GREEDYDATA:processMap}\], taskConfig:\[%{GREEDYDATA:taskConfig}\], :%{NOTSPACE:serial}\]"
}
}
kv {
source => "customOptions"
target => "customOptionsSplitter"
field_split_pattern => ", "
value_split => ":"
}
}