logstash 与 hdfs 特定持续时间

logstash with hdfs for paritcular duration

嗨,我是新的 logstash,我已经完成了从 tcp 读取数据并写入 hdfs 的工作...这部分是不,但我想将数据写入 hdfs 的 4 个不同文件夹

这里是示例代码

input {
tcp {

host => "X.X.X.X"
port => 5051
codec => json_lines
}

}
filter 
{
  mutate 
{
 remove_field => [ "@version", "path", "host","logger_name","tags","stack_info","level","port","type"]
 }
 mutate {
         add_field => { "count" => "1"} 
 }

 }

 output {
 webhdfs 
    {
    
        host => "127.0.0.1"                
        port => 50070  
        path => "/folder/%{+YYYY-MM-dd_HH-mm}.csv"          
        user => "hduser"                
        codec => line { format => "%{message}"} 
    }

这里的问题是我已经写入了文件夹,但后来我想写入另外 3 个不同的文件夹,例如 folder1、folder2、folder3 一段时间....

有可能,您需要使用一些 mutate 过滤器和一些条件。

首先你需要从事件的 @timestamp 中获取分钟的值并将该值添加到一个新的字段中,你可以使用 [@metadata] 对象,它可以用来过滤,但不会出现在输出事件中。

mutate {
    add_field => { "[@metadata][minute]" => "%{+mm}" }
}

然后您需要指定将哪一分钟保存在哪个文件夹中。

例如,如果您想要这样的东西:

00:00 至 00:59 - 文件夹 1
01:00 到 01:59 - 文件夹 2
02:00 到 02:59 - 文件夹 3
03:00 到 03:59 - 文件夹 4

下一分钟从文件夹 1 开始,04:00 到 04:59,考虑到前 8 分钟,您将需要这样的东西。

if [@metadata][minute] in ["00", "04"] {
    mutate {
        add_field => {"[@metadata][folder]" => "folder1" }
    }
}
if [@metadata][minute] in ["01", "05"] {
    mutate {
        add_field => {"[@metadata][folder]" => "folder2" }
    }
}
if [@metadata][minute] in ["02", "06"] {
    mutate {
       add_field => {"[@metadata][folder]" => "folder3" }
    }
}
if [@metadata][minute] in ["03", "07"] {
    mutate {
        add_field => {"[@metadata][folder]" => "folder4" }
    }
}

然后在您的输出中,您将在您的路径中使用 [@metadata][folder]

path => "/[@metadata][folder]/%{+YYYY-MM-dd_HH-mm}.csv"

您只需要将条件句扩展到该小时的其他分钟即可。