如何使用 logstash 在输出中重用添加的字段
How to reuse added field in output with logstash
我的用例很简单。
我在输入中有一个 Kafka,在 Elasticsearch 中有一些索引(主题名称 === 索引名称),其中索引名称与我们在应用程序中使用的实体相同,例如“建筑物”、“汽车”、“公共汽车” (仅作为示例)。
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
if [@metadata][kafka][topic] == 'cars' {
mutate {
convert => {
"car_id" => "integer"
}
add_field => {
'id' => "%{car_id}"
}
}
}
if [@metadata][kafka][topic] == 'bus' {
mutate {
convert => {
"bus_id" => "integer"
}
add_field => {
'id' => "%{bus_id}"
}
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{car_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{car_id}'
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{bus_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{bus_id}'
}
}
}
要从 Logstash 更新/删除文档,我需要使用它们的 ID。但你会明白,我不想为每个实体设置 50 个条件,我更喜欢因式分解。
我想重新使用 "id" 我在过滤器部分,在输出中使用它 document_id.
你知道我该怎么做吗?
你可以这样做:
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
translate {
source => "[@metadata][topic]"
target => "[@metadata][id_field]"
dictionary => {
"cars" => "car_id"
"bus" => "bus_id"
}
fallback => "no_id"
}
ruby {
code => "event.set('id', event.get(event.get('[@metadata][id_field]')))"
}
}
output {
if '%{[isDelete]}' in [message] {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{id}'
}
} else {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{id}'
}
}
}
我的用例很简单。 我在输入中有一个 Kafka,在 Elasticsearch 中有一些索引(主题名称 === 索引名称),其中索引名称与我们在应用程序中使用的实体相同,例如“建筑物”、“汽车”、“公共汽车” (仅作为示例)。
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
if [@metadata][kafka][topic] == 'cars' {
mutate {
convert => {
"car_id" => "integer"
}
add_field => {
'id' => "%{car_id}"
}
}
}
if [@metadata][kafka][topic] == 'bus' {
mutate {
convert => {
"bus_id" => "integer"
}
add_field => {
'id' => "%{bus_id}"
}
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{car_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{car_id}'
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{bus_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{bus_id}'
}
}
}
要从 Logstash 更新/删除文档,我需要使用它们的 ID。但你会明白,我不想为每个实体设置 50 个条件,我更喜欢因式分解。
我想重新使用 "id" 我在过滤器部分,在输出中使用它 document_id.
你知道我该怎么做吗?
你可以这样做:
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
translate {
source => "[@metadata][topic]"
target => "[@metadata][id_field]"
dictionary => {
"cars" => "car_id"
"bus" => "bus_id"
}
fallback => "no_id"
}
ruby {
code => "event.set('id', event.get(event.get('[@metadata][id_field]')))"
}
}
output {
if '%{[isDelete]}' in [message] {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{id}'
}
} else {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{id}'
}
}
}