使用 Logstash Aggregate Filter 插件处理可能排序或未排序的数据

Using Logstash Aggregate Filter plugin to process data which may or may not be sequenced

大家好!

我正在尝试使用 Logstash v7.7 的聚合过滤器插件来关联和合并来自两个不同 CSV 文件输入的数据,这些文件代表 API 数据调用。这个想法是制作一个显示组合图片的记录。如您所料,数据可能会或可能不会以正确的顺序到达。

举个例子:

/data/incoming/source_1/*.csv

StartTime, AckTime, Operation, RefData1, RefData2, OpSpecificData1 231313232,44343545,Register,ref-data-1a,ref-data-2a,op-specific-data-1 979898999,75758383,Register,ref-data-1b,ref-data-2b,op-specific-data-2 354656466,98554321,Cancel,ref-data-1c,ref-data-2c,op-specific-data-2

/data/incoming/source_1/*.csv

FinishTime,Operation,RefData1, RefData2, FinishSpecificData 67657657575,Cancel,ref-data-1c,ref-data-2c,FinishSpecific-Data-1 68445590877,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2 55443444313,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2

我有一个接收这两个 CSV 的管道,我能够处理它们并将它们作为单独的记录写入单个索引。然而,想法是将来自两个来源的记录组合成一个记录,每个记录代表一个超集。操作相关信息

不幸的是,尽管进行了多次尝试,我还是无法弄清楚如何通过聚合过滤器插件实现这一点。我的主要问题是这是否适合使用特定插件?如果是这样,欢迎提出任何建议!

目前,我有这个

input {
   file {
      path => ['/data/incoming/source_1/*.csv']
      tags => ["source1"]
   }
   file {
      path => ['/data/incoming/source_2/*.csv']
      tags => ["source2"]
   }
   # use the tags to do some source 1 and 2 related massaging, calculations, etc

   aggregate {
         task_id = "%{Operation}_%{RefData1}_%{RefData1}"
         code => "
             map['source_files'] ||= []
             map['source_files'] << {'source_file', event.get('path') }
         "
         push_map_as_event_on_timeout => true
         timeout => 600 #assuming this is the most far apart they will arrive         
   }
  ...
}
output {
    elastic { ...}
}

以及其他此类变体。但是,我一直在将个别记录写入索引,但无法合并。再一次,正如您从数据集中看到的那样,不能保证记录的顺序 - 所以我想知道过滤器是否是完成这项工作的正确工具,首先? :-\

还是只是我没用好! ;-)

无论哪种情况,欢迎任何输入/意见/建议。谢谢!

PS:此消息正在 cross-posted over from Elastic 论坛中。我提供了一个 link 以防万一那里也弹出一些答案。

答案是在upsert模式下使用Elasticsearch。具体请看here..

我建议首先让信息按顺序到达您,以便过滤器可以更好地接受它,其次,您可以在 pipeline.yml 中设置选项:pipeline.workers: 1 和 pipeline.ordered: true,从而保证处理顺序。