带有 elasticsearch 输出的 Logstash:如何写入不同的索引?

Logstash with elasticsearch output: how to write to different indices?

我希望在这里找到我从昨天开始苦苦挣扎的问题的答案:

我正在使用 rabbitMQ 输入和 elasticsearch 输出配置 Logstash 1.5.6。

消息以批量格式发布在 rabbitMQ 中,我的 logstash 使用它们并将它们全部写入 elasticsearch 默认索引 logstash-YYY.MM.DD 使用此配置:

input {
  rabbitmq {
  host => 'xxx'
  user => 'xxx'
  password => 'xxx'
  queue => 'xxx'
  exchange => "xxx"
  key => 'xxx'
  durable => true
}

output {
  elasticsearch {
  host => "xxx"
  cluster => "elasticsearch"
  flush_size =>10
  bind_port => 9300
  codec => "json"
  protocol => "http"
  }
stdout { codec => rubydebug }
}

现在我要做的是将消息发送到不同的 elasticsearch 索引。

来自 amqp 输入的消息已经具有索引和类型参数(批量格式)。

所以在阅读文档之后: https://www.elastic.co/guide/en/logstash/1.5/event-dependent-configuration.html#logstash-config-field-references

我试试看

input {
  rabbitmq {
  host => 'xxx'
  user => 'xxx'
  password => 'xxx'
  queue => 'xxx'
  exchange => "xxx"
  key => 'xxx'
  durable => true
}

output {
  elasticsearch {
  host => "xxx"
  cluster => "elasticsearch"
  flush_size =>10
  bind_port => 9300
  codec => "json"
  protocol => "http"
  index => "%{[index][_index]}"
  }
stdout { codec => rubydebug }
}

但是 logstash 正在做的是创建索引 %{[index][_index]} 并将所有文档放在那里,而不是读取 _index 参数并将文档发送到那里!

我还尝试了以下方法:

index => %{index}
index => '%{index}'
index => "%{index}"

但是 none 似乎有效。

有帮助吗?

要恢复,这里的主要问题是:如果 rabbitMQ 消息具有这种格式:

{"index":{"_index":"indexA","_type":"typeX","_ttl":2592000000}}
{"@timestamp":"2017-03-09T15:55:54.520Z","@version":"1","@fields":{DATA}}

如何告诉 logstash 在名为 "indexA" 的索引中发送类型为 "typeX" 的输出??

如果您在 RabbitMQ 中的消息已经是批量格式,那么您不需要使用 elasticsearch 输出,但是一个简单的 http 输出到达 _bulk 端点就可以了技巧:

output {
    http {
        http_method => "post"
        url => "http://localhost:9200/_bulk"
        format => "message"
        message => "%{message}"
    }
}

所以大家,在 Val 的帮助下,解决方案是:

  • 正如他所说,因为 RabbitMQ 消息已经是批量格式,不需要使用 elasticsearch 输出,http 输出到 _bulk API 就可以了(傻我)
  • 所以我用这个替换了输出:

    output {
       http {
       http_method => "post"
       url => "http://172.16.1.81:9200/_bulk"
       format => "message"
       message => "%{message}" 
    }
    stdout { codec => json_lines }
    }
    
  • 但是还是不行。我使用的是 Logstash 1.5.6,在升级到 Logstash 2.0.0 (https://www.elastic.co/guide/en/logstash/2.4/_upgrading_using_package_managers.html) 后,它使用相同的配置。

就是这样 :)

如果你在Rabbitmq中存储JSON消息,那么这个问题就可以解决了。 在 JSON 消息中使用索引和类型作为字段并将这些值分配给 Elasticsearch 输出插件。

index => "%{index}"                                                                 //INDEX from JSON body received from Kafka Producer document_type => "%{type}" }               //TYPE from JSON body

使用这种方法,每条消息都可以有自己的索引和类型。