Logstash csv 导入 - 如果不为空则变异 add_field

Logstash csv import - mutate add_field if not empty

我正在使用 logstash 将数据从 csv 文件导入我们的 elasticsearch。

在导入过程中,我想创建一个新字段,其中包含来自其他两个字段的值。这是我导入的片段:

    input {
          file {
            path => "/data/xyz/*.csv"
            start_position => "beginning"
            ignore_older => 0
            sincedb_path => "/dev/null"
          }
    }

    filter {
        if [path] =~ "csv1" {
            csv {
                  separator => ";"
                  columns =>
                  [
                    "name1",
                    "name2",
                    "name3",
                    "ID"              
                  ]
              }
                mutate {
                  add_field => {
                      "searchfield" => "%{name1} %{name2} %{name3}"
                }
            }
        }

    output {
           if [path] =~ "csv1" {
               elasticsearch {
                  hosts => "localhost"
                  index => "my_index"           
                  document_id => "%{ID}"
               }
           }
    }
}

这按预期工作,但在例如 name3 为空的行上,logstash 将 %{name3} 写入新字段。有没有办法只在不为空的情况下添加值?

我认为除了检查 name3 是否存在并以此为基础构建您的搜索字段之外别无他法。

if [name3] {
  mutate {
    id => "with-name3"
    add_field => { "searchfield" => "%{name1} %{name2} %{name3}" }
  }
} else {
  mutate {
    id => "without-name3"
    add_field => { "searchfield" => "%{name1} %{name2}" }
  }
}

或者,如果我对您的问题的理解正确,您显然希望将此数据发送到 Elasticsearch 并希望拥有一个可搜索字段。为了避免源中的数据重复,您可以使用 copy_to 语句构建搜索字段。您的映射如下所示:

{
  "mappings": {
    "doc": {
      "properties": {
        "name1": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name2": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name3": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "searchfield": {
          "type": "text"
        }
      }
    }
  }
}

然后您就可以完美地运行针对该字段进行查询,而不会在源中出现重复项。

更新。基本上你的 logstash.conf 看起来如下:

input {
  file {
    path => "/data/xyz/*.csv"
    start_position => "beginning"
    ignore_older => 0
    sincedb_path => "/dev/null"
  }
}

filter {
  if [path] =~ "csv1" {
    csv {
      separator => ";"
      columns => ["name1", "name2", "name3", "ID"]
    }
  }
}

output {
  if [path] =~ "csv1" {
    elasticsearch {
      hosts => "localhost"
      index => "my_index"
      document_id => "%{ID}"
    }
  }
}

然后使用以下内容创建 elasticsearch 索引:

PUT /my_index/
{
  "mappings": {
    "doc": {
      "properties": {
        "name1": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name2": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "name3": {
          "type": "text",
          "copy_to": "searchfield" 
        },
        "searchfield": {
          "type": "text"
        }
      }
    }
  }
}

然后您可以运行搜索如下:

GET /my_index/_search
{
  "query": {
    "match": {
      "searchfield": {
        "query": "your text"
      }
    }
  }
}