Logstash【聚合过滤器】在事件间传递数据
Logstash [aggregate filter] to pass data between events
我目前正在为日志监控系统开发一个使用 Elastic Stack 的项目。我必须加载的日志采用特定格式,因此我必须编写自己的 logstash 脚本来读取它们。特别是一种类型的日志,我在文件的开头有一个日期,而其他每一行中的时间戳都没有日期,我的目标是从第一行中提取日期并将其添加到所有下一行,经过一些研究,我发现聚合过滤器可以提供帮助,但我无法让它工作,这是我的配置文件:
input
{
file {
path => "F:/ELK/data/testFile.txt"
#path => "F:/ELK/data/*/request/*"
start_position => "beginning"
sincedb_path => "NUL"
}
}
filter
{
mutate {
add_field => { "taskId" => "all" }
}
grok
{
match => {"message" => "-- %{NOTSPACE} %{NOTSPACE}: %{DAY}, %{MONTH:month} %{MONTHDAY:day}, %{YEAR:year}%{GREEDYDATA}"}
tag_on_failure => ["not_date_line"]
}
if "not_date_line" not in [tags]
{
mutate{
replace => {'taskId' => "%{day}/%{month}/%{year}"}
remove_field => ["day","month","year"]
}
aggregate
{
task_id => "%{taskId}"
code => "map['taskId'] = event.get('taskId')"
map_action => "create"
}
}
else
{
dissect
{
mapping => { message => "%{sequence_index} %{time} %{pid} %{puid} %{stack_level} %{operation} %{params} %{op_type} %{form_event} %{op_duration}"}
}
aggregate {
task_id => "%{taskId}"
code => "event.set('taskId', map['taskId'])"
map_action => "update"
timeout => 0
}
mutate
{
strip => ["op_duration"]
replace => {"time" => "%{taskId}-%{time}"}
}
}
mutate
{
remove_field => ['@timestamp','host','@version','path','message','tags']
}
}
output
{
stdout{}
}
脚本可以正确读取日期,但无法替换其他事件中的值:
{
"taskId" => "22/October/2020"
}
{
"pid" => "45",
"sequence_index" => "10853799",
"op_type" => "1",
"time" => "all-16:23:29:629",
"params" => "90",
"stack_level" => "0",
"op_duration" => "",
"operation" => "10",
"form_event" => "0",
"taskId" => "all",
"puid" => "1724"
}
我只使用一名工作人员来确保事件的顺序保持不变,如果您知道任何其他实现此目的的方法,我愿意接受建议,谢谢!
对于具有日期的行,您将 taskId 设置为“%{day}/%{month}/%{year}”,对于其余行,您将其设置为“all”。聚合过滤器不会聚合具有不同任务 ID 的事件。
我建议您使用常量 taskId 并将日期存储在其他字段中,然后在单个聚合过滤器中您可以使用类似
code => '
date = event.get("date")
if date
@date = date
else
event.set("date", @date)
end
'
@date 是一个实例变量,因此它的范围仅限于该聚合过滤器,但它会跨事件保留。它不与其他聚合过滤器共享(需要 class 变量或全局变量)。
请注意,您需要保留事件顺序,因此您应将 pipeline.workers 设置为 1。
感谢@Badger 和他在 elastic 论坛上回答的其他 post,我找到了一个使用单个 ruby 过滤器和实例变量的解决方案,无法使用它聚合过滤器,但这对我来说不是问题。
ruby
{
init => '@date = ""'
code => "
event.set('date',@date) unless @date.empty?
@date = event.get('date') unless event.get('date').empty?
"
}
我目前正在为日志监控系统开发一个使用 Elastic Stack 的项目。我必须加载的日志采用特定格式,因此我必须编写自己的 logstash 脚本来读取它们。特别是一种类型的日志,我在文件的开头有一个日期,而其他每一行中的时间戳都没有日期,我的目标是从第一行中提取日期并将其添加到所有下一行,经过一些研究,我发现聚合过滤器可以提供帮助,但我无法让它工作,这是我的配置文件:
input
{
file {
path => "F:/ELK/data/testFile.txt"
#path => "F:/ELK/data/*/request/*"
start_position => "beginning"
sincedb_path => "NUL"
}
}
filter
{
mutate {
add_field => { "taskId" => "all" }
}
grok
{
match => {"message" => "-- %{NOTSPACE} %{NOTSPACE}: %{DAY}, %{MONTH:month} %{MONTHDAY:day}, %{YEAR:year}%{GREEDYDATA}"}
tag_on_failure => ["not_date_line"]
}
if "not_date_line" not in [tags]
{
mutate{
replace => {'taskId' => "%{day}/%{month}/%{year}"}
remove_field => ["day","month","year"]
}
aggregate
{
task_id => "%{taskId}"
code => "map['taskId'] = event.get('taskId')"
map_action => "create"
}
}
else
{
dissect
{
mapping => { message => "%{sequence_index} %{time} %{pid} %{puid} %{stack_level} %{operation} %{params} %{op_type} %{form_event} %{op_duration}"}
}
aggregate {
task_id => "%{taskId}"
code => "event.set('taskId', map['taskId'])"
map_action => "update"
timeout => 0
}
mutate
{
strip => ["op_duration"]
replace => {"time" => "%{taskId}-%{time}"}
}
}
mutate
{
remove_field => ['@timestamp','host','@version','path','message','tags']
}
}
output
{
stdout{}
}
脚本可以正确读取日期,但无法替换其他事件中的值:
{
"taskId" => "22/October/2020"
}
{
"pid" => "45",
"sequence_index" => "10853799",
"op_type" => "1",
"time" => "all-16:23:29:629",
"params" => "90",
"stack_level" => "0",
"op_duration" => "",
"operation" => "10",
"form_event" => "0",
"taskId" => "all",
"puid" => "1724"
}
我只使用一名工作人员来确保事件的顺序保持不变,如果您知道任何其他实现此目的的方法,我愿意接受建议,谢谢!
对于具有日期的行,您将 taskId 设置为“%{day}/%{month}/%{year}”,对于其余行,您将其设置为“all”。聚合过滤器不会聚合具有不同任务 ID 的事件。
我建议您使用常量 taskId 并将日期存储在其他字段中,然后在单个聚合过滤器中您可以使用类似
code => '
date = event.get("date")
if date
@date = date
else
event.set("date", @date)
end
'
@date 是一个实例变量,因此它的范围仅限于该聚合过滤器,但它会跨事件保留。它不与其他聚合过滤器共享(需要 class 变量或全局变量)。
请注意,您需要保留事件顺序,因此您应将 pipeline.workers 设置为 1。
感谢@Badger 和他在 elastic 论坛上回答的其他 post,我找到了一个使用单个 ruby 过滤器和实例变量的解决方案,无法使用它聚合过滤器,但这对我来说不是问题。
ruby
{
init => '@date = ""'
code => "
event.set('date',@date) unless @date.empty?
@date = event.get('date') unless event.get('date').empty?
"
}