如何根据某些动态字段值制作 Logstash 多行过滤器合并行?

How to make Logstash multiline filter merge lines based on some dynamic field value?

我是 logstash 的新手,很想为其中一个用例设置 ELK。我发现这个问题与我的相关 Why won't Logstash multiline merge lines based on grok'd field? 如果多行过滤器不合并 grok 字段上的行,那么我如何合并以下日志示例中的第 2 行和第 10 行?请帮忙

我使用 grok 模式创建了一个字段 'id',其中包含值 715。

Line1 - 5/08/06 00:10:35.348 [BaseAsyncApi] [qtp19303632-51]: INFO: [714] CMDC flowcxt=[55c2a5fbe4b0201c2be31e35] method=contentdetail uri=http://10.126.44.161:5600/cmdc/content/programid%3A%2F%2F317977349~programid%3A%2F%2F9?lang=eng&catalogueId=30&region=3000~3001&pset=pset_pps header={}   
Line2 - 2015/08/06 00:10:35.348 [BaseAsyncApi] [qtp19303632-53]: INFO: [715] CMDC flowcxt=[55c2a5fbe4b0201c2be31e36] method=contentdetail uri=http://10.126.44.161:5600/cmdc/content/programid%3A%2F%2F1640233758~programid%3A%2F%2F1073741829?lang=eng&catalogueId=30&region=3000~3001&pset=pset_pps header={}   
Line3 - 2015/08/06 00:10:35.349 [TWCAsyncProcessor] [TWC-pool-3-thread-2]: INFO: [714:426] TWC request=MercurySortRequest   
Line4 - 2015/08/06 00:10:35.349 [TWCAsyncProcessor] [TWC-pool-3-thread-1]: INFO: [715:427] TWC request=MercurySortRequest   
Line5 - 2015/08/06 00:10:35.352 [BaseAsyncApi] [qtp19303632-54]: INFO: [716] CMDC flowcxt=[55c2a5fbe4b0201c2be31e37] method=contentdetail uri=http://10.126.44.161:5600/cmdc/content/programid%3A%2F%2F2144942810~programid%3A%2F%2F1953281601?lang=eng&catalogueId=30&region=3000~3001&pset=pset_pps header={}   
Line6 - 2015/08/06 00:10:35.354 [TWCAsyncProcessor] [TWC-pool-3-thread-1]: INFO: [716:428] TWC request=MercurySortRequest   
Line7 - 2015/08/06 00:10:35.359 [BaseAsyncApi] [qtp19303632-49]: INFO: [717] CMDC flowcxt=[55c2a5fbe4b0201c2be31e38] method=contentdetail uri=http://10.126.44.161:5600/cmdc/content/programid%3A%2F%2F2144942448~programid%3A%2F%2F2147355770?lang=eng&catalogueId=30&region=3000~3001&pset=pset_pps header={}   
Line8 - 2015/08/06 00:10:35.360 [TWCAsyncProcessor] [TWC-pool-3-thread-2]: INFO: [717:429] TWC request=MercurySortRequest   
Line9 - 2015/08/06 00:10:35.366 [TWCAsyncProcessor$TWCAsyncProcessorCallback$ReceiveCallback] [CMDC-pool-2-thread-41]: INFO: [715:427] TWC response status=200 hits=1 time=17 internal=10.42   
Line10 - 2015/08/06 00:10:35.367 [BaseAsyncApi] [CMDC-pool-2-thread-41]: INFO: [715] CMDC response status=200 CMDC=19ms TWC=17ms #TWC=1

您需要使用设置了 stream_identitymultiline 过滤器。文档 here 不清楚它的用途,但您的基本策略是这样的:

if (!"multiline" in [tags]) {
  grok { // parse out your identity field }
  multiline { 
    stream_identity => "%{id}"
    pattern => "." // match anything because we're gathering by id field
    what => "previous"
    periodic_flush => true
    max_age => 5 // however many seconds it takes to get all of your lines together
    add_tags => ["multiline" ]
  }
} else {
  // process multiline event that's been flushed
}

自从 1.5 发布以来,我还没有尝试过类似的东西,但文档说它应该可以工作(在 1.4.2 和之前的版本中,刷新机制不起作用,所以你可能会丢失事件)。