带有 elasticsearch 输出的 Logstash:如何写入不同的索引?
Logstash with elasticsearch output: how to write to different indices?
我希望在这里找到我从昨天开始苦苦挣扎的问题的答案:
我正在使用 rabbitMQ 输入和 elasticsearch 输出配置 Logstash 1.5.6。
消息以批量格式发布在 rabbitMQ 中,我的 logstash 使用它们并将它们全部写入 elasticsearch 默认索引 logstash-YYY.MM.DD 使用此配置:
input {
rabbitmq {
host => 'xxx'
user => 'xxx'
password => 'xxx'
queue => 'xxx'
exchange => "xxx"
key => 'xxx'
durable => true
}
output {
elasticsearch {
host => "xxx"
cluster => "elasticsearch"
flush_size =>10
bind_port => 9300
codec => "json"
protocol => "http"
}
stdout { codec => rubydebug }
}
现在我要做的是将消息发送到不同的 elasticsearch 索引。
来自 amqp 输入的消息已经具有索引和类型参数(批量格式)。
我试试看
input {
rabbitmq {
host => 'xxx'
user => 'xxx'
password => 'xxx'
queue => 'xxx'
exchange => "xxx"
key => 'xxx'
durable => true
}
output {
elasticsearch {
host => "xxx"
cluster => "elasticsearch"
flush_size =>10
bind_port => 9300
codec => "json"
protocol => "http"
index => "%{[index][_index]}"
}
stdout { codec => rubydebug }
}
但是 logstash 正在做的是创建索引 %{[index][_index]} 并将所有文档放在那里,而不是读取 _index 参数并将文档发送到那里!
我还尝试了以下方法:
index => %{index}
index => '%{index}'
index => "%{index}"
但是 none 似乎有效。
有帮助吗?
要恢复,这里的主要问题是:如果 rabbitMQ 消息具有这种格式:
{"index":{"_index":"indexA","_type":"typeX","_ttl":2592000000}}
{"@timestamp":"2017-03-09T15:55:54.520Z","@version":"1","@fields":{DATA}}
如何告诉 logstash 在名为 "indexA" 的索引中发送类型为 "typeX" 的输出??
如果您在 RabbitMQ 中的消息已经是批量格式,那么您不需要使用 elasticsearch
输出,但是一个简单的 http
输出到达 _bulk
端点就可以了技巧:
output {
http {
http_method => "post"
url => "http://localhost:9200/_bulk"
format => "message"
message => "%{message}"
}
}
所以大家,在 Val 的帮助下,解决方案是:
- 正如他所说,因为 RabbitMQ 消息已经是批量格式,不需要使用 elasticsearch 输出,http 输出到 _bulk API 就可以了(傻我)
所以我用这个替换了输出:
output {
http {
http_method => "post"
url => "http://172.16.1.81:9200/_bulk"
format => "message"
message => "%{message}"
}
stdout { codec => json_lines }
}
但是还是不行。我使用的是 Logstash 1.5.6,在升级到 Logstash 2.0.0 (https://www.elastic.co/guide/en/logstash/2.4/_upgrading_using_package_managers.html) 后,它使用相同的配置。
就是这样 :)
如果你在Rabbitmq中存储JSON消息,那么这个问题就可以解决了。
在 JSON 消息中使用索引和类型作为字段并将这些值分配给 Elasticsearch 输出插件。
index =>
"%{index}"
//INDEX from JSON body received from Kafka Producer document_type => "%{type}" } //TYPE from JSON body
使用这种方法,每条消息都可以有自己的索引和类型。
我希望在这里找到我从昨天开始苦苦挣扎的问题的答案:
我正在使用 rabbitMQ 输入和 elasticsearch 输出配置 Logstash 1.5.6。
消息以批量格式发布在 rabbitMQ 中,我的 logstash 使用它们并将它们全部写入 elasticsearch 默认索引 logstash-YYY.MM.DD 使用此配置:
input {
rabbitmq {
host => 'xxx'
user => 'xxx'
password => 'xxx'
queue => 'xxx'
exchange => "xxx"
key => 'xxx'
durable => true
}
output {
elasticsearch {
host => "xxx"
cluster => "elasticsearch"
flush_size =>10
bind_port => 9300
codec => "json"
protocol => "http"
}
stdout { codec => rubydebug }
}
现在我要做的是将消息发送到不同的 elasticsearch 索引。
来自 amqp 输入的消息已经具有索引和类型参数(批量格式)。
我试试看
input {
rabbitmq {
host => 'xxx'
user => 'xxx'
password => 'xxx'
queue => 'xxx'
exchange => "xxx"
key => 'xxx'
durable => true
}
output {
elasticsearch {
host => "xxx"
cluster => "elasticsearch"
flush_size =>10
bind_port => 9300
codec => "json"
protocol => "http"
index => "%{[index][_index]}"
}
stdout { codec => rubydebug }
}
但是 logstash 正在做的是创建索引 %{[index][_index]} 并将所有文档放在那里,而不是读取 _index 参数并将文档发送到那里!
我还尝试了以下方法:
index => %{index}
index => '%{index}'
index => "%{index}"
但是 none 似乎有效。
有帮助吗?
要恢复,这里的主要问题是:如果 rabbitMQ 消息具有这种格式:
{"index":{"_index":"indexA","_type":"typeX","_ttl":2592000000}}
{"@timestamp":"2017-03-09T15:55:54.520Z","@version":"1","@fields":{DATA}}
如何告诉 logstash 在名为 "indexA" 的索引中发送类型为 "typeX" 的输出??
如果您在 RabbitMQ 中的消息已经是批量格式,那么您不需要使用 elasticsearch
输出,但是一个简单的 http
输出到达 _bulk
端点就可以了技巧:
output {
http {
http_method => "post"
url => "http://localhost:9200/_bulk"
format => "message"
message => "%{message}"
}
}
所以大家,在 Val 的帮助下,解决方案是:
- 正如他所说,因为 RabbitMQ 消息已经是批量格式,不需要使用 elasticsearch 输出,http 输出到 _bulk API 就可以了(傻我)
所以我用这个替换了输出:
output { http { http_method => "post" url => "http://172.16.1.81:9200/_bulk" format => "message" message => "%{message}" } stdout { codec => json_lines } }
但是还是不行。我使用的是 Logstash 1.5.6,在升级到 Logstash 2.0.0 (https://www.elastic.co/guide/en/logstash/2.4/_upgrading_using_package_managers.html) 后,它使用相同的配置。
就是这样 :)
如果你在Rabbitmq中存储JSON消息,那么这个问题就可以解决了。 在 JSON 消息中使用索引和类型作为字段并将这些值分配给 Elasticsearch 输出插件。
index => "%{index}" //INDEX from JSON body received from Kafka Producer document_type => "%{type}" } //TYPE from JSON body
使用这种方法,每条消息都可以有自己的索引和类型。