使用 Logstash Aggregate Filter 插件处理可能排序或未排序的数据
Using Logstash Aggregate Filter plugin to process data which may or may not be sequenced
大家好!
我正在尝试使用 Logstash v7.7 的聚合过滤器插件来关联和合并来自两个不同 CSV 文件输入的数据,这些文件代表 API 数据调用。这个想法是制作一个显示组合图片的记录。如您所料,数据可能会或可能不会以正确的顺序到达。
举个例子:
/data/incoming/source_1/*.csv
StartTime, AckTime, Operation, RefData1, RefData2, OpSpecificData1
231313232,44343545,Register,ref-data-1a,ref-data-2a,op-specific-data-1
979898999,75758383,Register,ref-data-1b,ref-data-2b,op-specific-data-2
354656466,98554321,Cancel,ref-data-1c,ref-data-2c,op-specific-data-2
/data/incoming/source_1/*.csv
FinishTime,Operation,RefData1, RefData2, FinishSpecificData
67657657575,Cancel,ref-data-1c,ref-data-2c,FinishSpecific-Data-1
68445590877,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2
55443444313,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2
我有一个接收这两个 CSV 的管道,我能够处理它们并将它们作为单独的记录写入单个索引。然而,想法是将来自两个来源的记录组合成一个记录,每个记录代表一个超集。操作相关信息
不幸的是,尽管进行了多次尝试,我还是无法弄清楚如何通过聚合过滤器插件实现这一点。我的主要问题是这是否适合使用特定插件?如果是这样,欢迎提出任何建议!
目前,我有这个
input {
file {
path => ['/data/incoming/source_1/*.csv']
tags => ["source1"]
}
file {
path => ['/data/incoming/source_2/*.csv']
tags => ["source2"]
}
# use the tags to do some source 1 and 2 related massaging, calculations, etc
aggregate {
task_id = "%{Operation}_%{RefData1}_%{RefData1}"
code => "
map['source_files'] ||= []
map['source_files'] << {'source_file', event.get('path') }
"
push_map_as_event_on_timeout => true
timeout => 600 #assuming this is the most far apart they will arrive
}
...
}
output {
elastic { ...}
}
以及其他此类变体。但是,我一直在将个别记录写入索引,但无法合并。再一次,正如您从数据集中看到的那样,不能保证记录的顺序 - 所以我想知道过滤器是否是完成这项工作的正确工具,首先? :-\
还是只是我没用好! ;-)
无论哪种情况,欢迎任何输入/意见/建议。谢谢!
PS:此消息正在 cross-posted over from Elastic 论坛中。我提供了一个 link 以防万一那里也弹出一些答案。
答案是在upsert模式下使用Elasticsearch。具体请看here..
我建议首先让信息按顺序到达您,以便过滤器可以更好地接受它,其次,您可以在 pipeline.yml 中设置选项:pipeline.workers: 1 和 pipeline.ordered: true,从而保证处理顺序。
大家好!
我正在尝试使用 Logstash v7.7 的聚合过滤器插件来关联和合并来自两个不同 CSV 文件输入的数据,这些文件代表 API 数据调用。这个想法是制作一个显示组合图片的记录。如您所料,数据可能会或可能不会以正确的顺序到达。
举个例子:
/data/incoming/source_1/*.csv
StartTime, AckTime, Operation, RefData1, RefData2, OpSpecificData1 231313232,44343545,Register,ref-data-1a,ref-data-2a,op-specific-data-1 979898999,75758383,Register,ref-data-1b,ref-data-2b,op-specific-data-2 354656466,98554321,Cancel,ref-data-1c,ref-data-2c,op-specific-data-2
/data/incoming/source_1/*.csv
FinishTime,Operation,RefData1, RefData2, FinishSpecificData 67657657575,Cancel,ref-data-1c,ref-data-2c,FinishSpecific-Data-1 68445590877,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2 55443444313,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2
我有一个接收这两个 CSV 的管道,我能够处理它们并将它们作为单独的记录写入单个索引。然而,想法是将来自两个来源的记录组合成一个记录,每个记录代表一个超集。操作相关信息
不幸的是,尽管进行了多次尝试,我还是无法弄清楚如何通过聚合过滤器插件实现这一点。我的主要问题是这是否适合使用特定插件?如果是这样,欢迎提出任何建议!
目前,我有这个
input {
file {
path => ['/data/incoming/source_1/*.csv']
tags => ["source1"]
}
file {
path => ['/data/incoming/source_2/*.csv']
tags => ["source2"]
}
# use the tags to do some source 1 and 2 related massaging, calculations, etc
aggregate {
task_id = "%{Operation}_%{RefData1}_%{RefData1}"
code => "
map['source_files'] ||= []
map['source_files'] << {'source_file', event.get('path') }
"
push_map_as_event_on_timeout => true
timeout => 600 #assuming this is the most far apart they will arrive
}
...
}
output {
elastic { ...}
}
以及其他此类变体。但是,我一直在将个别记录写入索引,但无法合并。再一次,正如您从数据集中看到的那样,不能保证记录的顺序 - 所以我想知道过滤器是否是完成这项工作的正确工具,首先? :-\
还是只是我没用好! ;-)
无论哪种情况,欢迎任何输入/意见/建议。谢谢!
PS:此消息正在 cross-posted over from Elastic 论坛中。我提供了一个 link 以防万一那里也弹出一些答案。
答案是在upsert模式下使用Elasticsearch。具体请看here..
我建议首先让信息按顺序到达您,以便过滤器可以更好地接受它,其次,您可以在 pipeline.yml 中设置选项:pipeline.workers: 1 和 pipeline.ordered: true,从而保证处理顺序。