将城市名称映射到 GeoPoint 从 Logstash 到 Elasticsearch

Mapping Cityname to GeoPoint from Logstash to Elasticsearch

我得到了一些像这样的日志:

2015-09-25 12:07:55.441 INFO 17328 --- [][][][] 
XXX.YYY.SomeClass : Someone request in CityX!

然后我将其导入 Elasticsearch:

{
    "_index": "logstash-2015.09.25",
    "_type": "redis-input",
    "_id": "AVADGRo7JaVbcBhehzEj",
    "_score": 1,
    "_source": {
        "@timestamp": "2015-09-25T12:21:24.616+08:00",
        "@version": 1,
        "message": "Someone request in CityX!",
        "logger_name": "XXX.YYY.SomeClass",
        "thread_name": "pool-22-thread-1",
        "level": "INFO",
        "level_value": 20000,
        "HOSTNAME": "host",
        "host": "192.168.5.194: 57154",
        "type": "redis-input"
    }
}

我只想将CityX(任何城市出现在我的日志中,并假设我们可以得到每个城市的经纬度)映射到Elasticsearch中的GeoPoint,所以我们可以通过 Kibana 在地图中显示用户请求的计数。我应该怎么做?

整个管道:

logstash(:4560) --> redis(:6379) --> logstash-indexer --> elasticsearch (:9200)

配置:

Logstash -> Redis:

input {
  tcp {
    port => 4560
    codec => json_lines
  }
}

output {

  redis {
    host => "10.0.40.155"
    port => 6379
    data_type => "list"
    key => "key_count"
  }
}

Redis -> Logstash -> Elasticsearch:

input {
  redis {
    host => "127.0.0.1"
    port => 6379
    type => "redis-input"
    data_type => "list"
    key => "key_count"
  }
}

output {
  stdout {}
  elasticsearch {
    host => "10.0.40.156"
    cluster => "elasticsearch"
    codec => "json"
    protocol => "http"
  }
}

我的 Java 程序日志 City,Longitude,Latitude:

我的日志示例:

ChinaUnicom Zhejiang Hangzhou 30.29294,120.10956 REQUEST
ChinaUnicom Zhejiang Hangzhou 30.29294,120.10956 REQUEST
ChinaTelecom Zhejiang Hangzhou 30.29294,120.10956 REQUEST

Zhejiang是中国的一个省,HangzhouZhejiang的一个城市。

我先添加一个grok过滤器来解析日志,然后用add_field转换成Kibana可以识别的geo_point。

input {
  redis {
    host => "127.0.0.1"
    port => 6379
    type => "redis-input"
    data_type => "list"
    key => "key_count"
  }
}

filter {
  grok {
    match => { "message" => "%{WORD:carrier} %{WORD:province} %{WORD:city} %{BASE10NUM:latitude},%{BASE10NUM:longitude} %{WORD:geo_message}"}
    add_field => {"geoip.location" => "%{latitude},%{longitude}"}
  }
}

output {
  stdout {}
  elasticsearch {
    host => "10.0.40.156"
    cluster => "elasticsearch"
    codec => "json"
    protocol => "http"
  }
}