Logstash【聚合过滤器】在事件间传递数据

Logstash [aggregate filter] to pass data between events

我目前正在为日志监控系统开发一个使用 Elastic Stack 的项目。我必须加载的日志采用特定格式,因此我必须编写自己的 logstash 脚本来读取它们。特别是一种类型的日志,我在文件的开头有一个日期,而其他每一行中的时间戳都没有日期,我的目标是从第一行中提取日期并将其添加到所有下一行,经过一些研究,我发现聚合过滤器可以提供帮助,但我无法让它工作,这是我的配置文件:

input
{
    file {
        path => "F:/ELK/data/testFile.txt"
        #path => "F:/ELK/data/*/request/*"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
}
filter
{
    mutate {
        add_field => { "taskId" => "all" }
    }

        grok
        {
            match => {"message" => "-- %{NOTSPACE} %{NOTSPACE}: %{DAY}, %{MONTH:month} %{MONTHDAY:day}, %{YEAR:year}%{GREEDYDATA}"}
            tag_on_failure => ["not_date_line"]
        }

        
    
    if "not_date_line" not in [tags]
    {
        mutate{
            replace => {'taskId' => "%{day}/%{month}/%{year}"}
            remove_field => ["day","month","year"]
        }

        aggregate
        {
            task_id => "%{taskId}"
            code => "map['taskId'] = event.get('taskId')"
            map_action => "create"
        }
    }
    else
    {
        dissect
        {
            mapping => { message => "%{sequence_index}  %{time} %{pid}  %{puid} %{stack_level}  %{operation}    %{params}   %{op_type}  %{form_event}   %{op_duration}"}
        }

        aggregate {
            task_id => "%{taskId}"
            code => "event.set('taskId', map['taskId'])"
            map_action => "update"
            timeout => 0
        }
        mutate
        {
            strip => ["op_duration"]
            replace => {"time" => "%{taskId}-%{time}"}
        }
    }
    
    mutate
    {
        remove_field => ['@timestamp','host','@version','path','message','tags']
    }
}
output 
{
    stdout{}
}

脚本可以正确读取日期,但无法替换其他事件中的值:


{
    "taskId" => "22/October/2020"
}
{
               "pid" => "45",
    "sequence_index" => "10853799",
           "op_type" => "1",
              "time" => "all-16:23:29:629",
            "params" => "90",
       "stack_level" => "0",
       "op_duration" => "",
         "operation" => "10",
        "form_event" => "0",
            "taskId" => "all",
              "puid" => "1724"
}

我只使用一名工作人员来确保事件的顺序保持不变,如果您知道任何其他实现此目的的方法,我愿意接受建议,谢谢!

对于具有日期的行,您将 taskId 设置为“%{day}/%{month}/%{year}”,对于其余行,您将其设置为“all”。聚合过滤器不会聚合具有不同任务 ID 的事件。

我建议您使用常量 taskId 并将日期存储在其他字段中,然后在单个聚合过滤器中您可以使用类似

code => '
    date = event.get("date")
    if date
        @date = date
    else
        event.set("date", @date)
    end
'

@date 是一个实例变量,因此它的范围仅限于该聚合过滤器,但它会跨事件保留。它不与其他聚合过滤器共享(需要 class 变量或全局变量)。

请注意,您需要保留事件顺序,因此您应将 pipeline.workers 设置为 1。

感谢@Badger 和他在 elastic 论坛上回答的其他 post,我找到了一个使用单个 ruby 过滤器和实例变量的解决方案,无法使用它聚合过滤器,但这对我来说不是问题。

ruby
{
    init => '@date = ""'
    code => "
        event.set('date',@date) unless @date.empty?
        @date = event.get('date') unless event.get('date').empty?
    "
}