Logstash 大会 |从路径中提取文件名

Logstash Conf | Extracting Filename from Path

我正在尝试设置 Logstash 以提供 Elasticsearch。当然,我创建了以下 conf 文件,它看起来运行良好:

input {
  beats {
    port => 5044
  }
  
  file {
    path => "C:/f1/f2/Logs/f3/LocalHost#base#iway_2022-03-28T10_45_15.log"
  }
}

filter {
  grok {
    match => {
      "message" => [
        ".%{TIMESTAMP_ISO8601:timeStamp}. %{LOGLEVEL:loglevel} .(W.)%{DATA:thread}.%{INT:thread_pool}. %{GREEDYDATA:msgbody}",
        ".%{TIMESTAMP_ISO8601:timeStamp}. %{LOGLEVEL:loglevel} .%{DATA:thread}. %{GREEDYDATA:msgbody}"      
      ]
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://localhost:9200"]
    index => "iway_logs"
    user => "elastic"
    password => "something"
    cacert => "C:\f1\f2\logstash-8.1.3\config\cert\elasticsearch_http_ca.crt"
  }
}

我一直在尝试添加两个新字段,但到目前为止都没有成功。以下是经过多次修改后的 conf 文件的当前版本。

input {
  beats {
    port => 5044
  }
  
  file {
    path => "C:/f1/f2/Logs/f3/LocalHost#base#iway_2022-03-28T10_45_15.log"
  }
}

filter {
  grok {
    match => {
      "message" => [
        ".%{TIMESTAMP_ISO8601:timeStamp}. %{LOGLEVEL:loglevel} .(W.)%{DATA:thread}.%{INT:thread_pool}. %{GREEDYDATA:msgbody}",
        ".%{TIMESTAMP_ISO8601:timeStamp}. %{LOGLEVEL:loglevel} .%{DATA:thread}. %{GREEDYDATA:msgbody}"      
      ]
    }
  }
  grok {
        match => { 
            "path" => "%{GREEDYDATA}/%{GREEDYDATA:filename}\.log"
            }
  }
  mutate {
        split => { "filename" => "#" }
        add_field => { "serverName" => "%{[filename][0]}" }
        add_field => { "configName" => "%{[filename][1]}" }
  }
}

output {
  elasticsearch {
    hosts => ["https://localhost:9200"]
    index => "iway_logs"
    user => "elastic"
    password => "something"
    cacert => "C:\f1\f2\logstash-8.1.3\config\cert\elasticsearch_http_ca.crt"
  }
}

新字段(即 serverName 和 configName)的结果始终报告原始表达式而不是评估输出。有人可以帮忙吗? TIA.

您可能应该为此利用 dissect filter,像这样:

filter {
  if [path] {
    dissect {
      mapping => {
       "path" => "C:/f1/f2/Logs/f3/%{serverName}#%{configName}#%{?ignore}.log"
      }
    }
  }
}

如果您有 ECS compatibility enabledpath 字段称为 [log][file][path],那么您的配置应该是这个:

filter {
  if [log][file][path] {
    dissect {
      mapping => {
       "[log][file][path]" => "C:/f1/f2/Logs/f3/%{serverName}#%{configName}#%{?ignore}.log"
      }
    }
  }
}