Logstash:如果输出失败,则将事件发送到其他地方

Logstash: send event elsewhere if output failed

提供以下 logstash 管道:

input
{
    generator
    {
        lines => [
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"name" : "user_interaction", "product" : { "module" : "search" , "name" : "front"}, "data" : { "query" : "toto"}}',
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"hello": "world"}',
        '{"name" :"wrong data", "data" : "I am wrong !"}',
        '{"name" :"wrong data", "data" : { "hello" : "world" }}'
        ]
        codec => json
        count => 1
    }
}

filter
{
  mutate
  {
    remove_field => ["sequence", "host", "@version"]
  }
}

output
{
   elasticsearch
   {
     hosts => ["elasticsearch:9200"]
     index => "events-dev6-test"
     document_type => "_doc"
     manage_template => false
   }

   stdout
   {
       codec => rubydebug
   }
}

elasticsearch 对此索引有严格的映射,因此,某些事件会给出 400 错误 "mapping set to strict, dynamic introduction of [hello] within [data] is not allowed"(这是正常的)。

如何将失败的事件发送到其他地方(文本日志或其他 elasticsearch 索引)(这样我就不会丢失事件)?

Logstash 6.2 引入了 Dead Letter Queues,可以用来做你想做的事。您需要在 logstash.yml.

中启用 dead_letter_queue.enable: true

然后将其作为输入处理:

input {
  dead_letter_queue {
    path => "/path/to/data/dead_letter_queue" 
    commit_offsets => true 
    pipeline_id => "main" 
  }
}

output {
  file {
    path => ...
       codec => line { format => "%{message}"}
   }    
}

在 6.2 之前,我不相信有办法做你想做的事。