logstash 与 hdfs 特定持续时间
logstash with hdfs for paritcular duration
嗨,我是新的 logstash,我已经完成了从 tcp 读取数据并写入 hdfs 的工作...这部分是不,但我想将数据写入 hdfs 的 4 个不同文件夹
这里是示例代码
input {
tcp {
host => "X.X.X.X"
port => 5051
codec => json_lines
}
}
filter
{
mutate
{
remove_field => [ "@version", "path", "host","logger_name","tags","stack_info","level","port","type"]
}
mutate {
add_field => { "count" => "1"}
}
}
output {
webhdfs
{
host => "127.0.0.1"
port => 50070
path => "/folder/%{+YYYY-MM-dd_HH-mm}.csv"
user => "hduser"
codec => line { format => "%{message}"}
}
这里的问题是我已经写入了文件夹,但后来我想写入另外 3 个不同的文件夹,例如 folder1、folder2、folder3 一段时间....
有可能,您需要使用一些 mutate
过滤器和一些条件。
首先你需要从事件的 @timestamp
中获取分钟的值并将该值添加到一个新的字段中,你可以使用 [@metadata]
对象,它可以用来过滤,但不会出现在输出事件中。
mutate {
add_field => { "[@metadata][minute]" => "%{+mm}" }
}
然后您需要指定将哪一分钟保存在哪个文件夹中。
例如,如果您想要这样的东西:
00:00 至 00:59 - 文件夹 1
01:00 到 01:59 - 文件夹 2
02:00 到 02:59 - 文件夹 3
03:00 到 03:59 - 文件夹 4
下一分钟从文件夹 1 开始,04:00 到 04:59,考虑到前 8 分钟,您将需要这样的东西。
if [@metadata][minute] in ["00", "04"] {
mutate {
add_field => {"[@metadata][folder]" => "folder1" }
}
}
if [@metadata][minute] in ["01", "05"] {
mutate {
add_field => {"[@metadata][folder]" => "folder2" }
}
}
if [@metadata][minute] in ["02", "06"] {
mutate {
add_field => {"[@metadata][folder]" => "folder3" }
}
}
if [@metadata][minute] in ["03", "07"] {
mutate {
add_field => {"[@metadata][folder]" => "folder4" }
}
}
然后在您的输出中,您将在您的路径中使用 [@metadata][folder]
。
path => "/[@metadata][folder]/%{+YYYY-MM-dd_HH-mm}.csv"
您只需要将条件句扩展到该小时的其他分钟即可。
嗨,我是新的 logstash,我已经完成了从 tcp 读取数据并写入 hdfs 的工作...这部分是不,但我想将数据写入 hdfs 的 4 个不同文件夹
这里是示例代码
input {
tcp {
host => "X.X.X.X"
port => 5051
codec => json_lines
}
}
filter
{
mutate
{
remove_field => [ "@version", "path", "host","logger_name","tags","stack_info","level","port","type"]
}
mutate {
add_field => { "count" => "1"}
}
}
output {
webhdfs
{
host => "127.0.0.1"
port => 50070
path => "/folder/%{+YYYY-MM-dd_HH-mm}.csv"
user => "hduser"
codec => line { format => "%{message}"}
}
这里的问题是我已经写入了文件夹,但后来我想写入另外 3 个不同的文件夹,例如 folder1、folder2、folder3 一段时间....
有可能,您需要使用一些 mutate
过滤器和一些条件。
首先你需要从事件的 @timestamp
中获取分钟的值并将该值添加到一个新的字段中,你可以使用 [@metadata]
对象,它可以用来过滤,但不会出现在输出事件中。
mutate {
add_field => { "[@metadata][minute]" => "%{+mm}" }
}
然后您需要指定将哪一分钟保存在哪个文件夹中。
例如,如果您想要这样的东西:
00:00 至 00:59 - 文件夹 1
01:00 到 01:59 - 文件夹 2
02:00 到 02:59 - 文件夹 3
03:00 到 03:59 - 文件夹 4
下一分钟从文件夹 1 开始,04:00 到 04:59,考虑到前 8 分钟,您将需要这样的东西。
if [@metadata][minute] in ["00", "04"] {
mutate {
add_field => {"[@metadata][folder]" => "folder1" }
}
}
if [@metadata][minute] in ["01", "05"] {
mutate {
add_field => {"[@metadata][folder]" => "folder2" }
}
}
if [@metadata][minute] in ["02", "06"] {
mutate {
add_field => {"[@metadata][folder]" => "folder3" }
}
}
if [@metadata][minute] in ["03", "07"] {
mutate {
add_field => {"[@metadata][folder]" => "folder4" }
}
}
然后在您的输出中,您将在您的路径中使用 [@metadata][folder]
。
path => "/[@metadata][folder]/%{+YYYY-MM-dd_HH-mm}.csv"
您只需要将条件句扩展到该小时的其他分钟即可。