如何使用 logstash 在输出中重用添加的字段

How to reuse added field in output with logstash

我的用例很简单。 我在输入中有一个 Kafka,在 Elasticsearch 中有一些索引(主题名称 === 索引名称),其中索引名称与我们在应用程序中使用的实体相同,例如“建筑物”、“汽车”、“公共汽车” (仅作为示例)。

input {
    kafka {
        bootstrap_servers => "kafka:29092"
        topics => ['cars', 'bus']
        decorate_events => true
        codec => 'json'
    }
}

filter {
    if [@metadata][kafka][topic] == 'cars' {
        mutate {
            convert => {
                "car_id" => "integer"
            }
            add_field => {
                'id' => "%{car_id}"
            }
        }
    }

    if [@metadata][kafka][topic] == 'bus' {
        mutate {
            convert => {
                "bus_id" => "integer"
            }
            add_field => {
                'id' => "%{bus_id}"
            }
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{car_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{car_id}'
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{bus_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{bus_id}'
        }
    }
}

要从 Logstash 更新/删除文档,我需要使用它们的 ID。但你会明白,我不想为每个实体设置 50 个条件,我更喜欢因式分解。

我想重新使用 "id" 我在过滤器部分,在输出中使用它 document_id.

你知道我该怎么做吗?

你可以这样做:

input {
    kafka {
        bootstrap_servers => "kafka:29092"
        topics => ['cars', 'bus']
        decorate_events => true
        codec => 'json'
    }
}

filter {
    translate {
       source => "[@metadata][topic]"
       target => "[@metadata][id_field]"
       dictionary => {
          "cars" => "car_id"
          "bus" => "bus_id"
       }
       fallback => "no_id"
    }
    ruby {
        code => "event.set('id', event.get(event.get('[@metadata][id_field]')))"
    }
}

output {
    if '%{[isDelete]}' in [message] {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{id}'
        }
    } else {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            doc_as_upsert => true
            action => 'update'
            document_id => '%{id}'
        }
    }
}