JSON 在 Logstash 中转义

JSON escaping in Logstash

我正在通过 TCP 从 Java 服务器发送一些 JSON 数据到 Logstash(Logstash 将它们发送到 Elasticsearch),这些 JSON 数据似乎在 Elastic 中被转义了。

Java连载:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", event.getAge());
for (Entry<String, Serializable> attribute : event.getAttributes().entrySet()) {
    jsonMap.put("attribute_" + attribute.getKey(), attribute.getValue());
}
jsonMap.put("message", event.getMessage());
jsonMap.put("cause", event.getCause());
jsonMap.put("timestamp", event.getTimestamp());
jsonMap.put("eventid", event.getEventId());
jsonMap.put("instanceid", event.getInstanceId());
jsonMap.put("origin", event.getOrigin());
jsonMap.put("severity", event.getSeverity());
jsonMap.put("durability", event.getDurability());
jsonMap.put("detail", event.getDetail());
int i = 0;
for (String tag : event.getTags()) {
    jsonMap.put("tag_" + String.valueOf(i), tag);
    i++;
}

return new JSONObject(jsonMap).toString();

Java 套接字:

try (Socket clientSocket = new Socket(url, port);
    OutputStreamWriter out = new OutputStreamWriter(
        clientSocket.getOutputStream(), "UTF-8")) {
    out.write(content.toString());
    out.flush();
}

Elastic 中的示例数据:

"message": "{\"detail\":null,\"cause\":null,\"attribute_start\":\"Mon Jan 11 16:15:28 CET 2016\",\"durability\":\"MOMENTARY\",\"attribute_login\":\"\",\"origin\":\"fortuna.ws.navipro\",\"severity\":\"ERROR\",\"attribute_type\":null,\"attribute_methodName\":\"Logout\",\"eventid\":\"ws.navipro.call\",\"attribute_call\":\"[57,7256538816272415441,,OK]{0 connections} CZ() Calling method 'Logout' at 1452525329029(Mon Jan 11 16:15:28 CET 2016-Mon Jan 11 16:15:29 CET 2016; roundtrip=36ms):\n\tRequest\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ClientLogoutRequest\n\t\tkeep: true\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.RequestBody\n\t\tcountry: CZ\n\t\tsessionCC: NULL\n\t\tsessionID:\n\t\tsessionIP:\n\t\tdebug: NULL\n\t\tns: NULL\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.RequestCorpus\n\tResponse\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ClientLogoutResponse\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ResponseBody\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ResponseCorpus\n\t\tmessage: \n\t\t[1] \n\t\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.Message\n\t\t\tparam: \n\t\t\t[1] \n\t\t\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.Message$Param\n\t\t\t\tindex: 0\n\t\t\t\ttype: NULL\n\t\t\t\tvalue: 3\n\t\t\tid: 104\n\t\t\tseverity: NOTIFICATION\n\t\t\tlink: NULL\n\t\tentryLink: NULL\n\t\thint: NULL\n\t\thintType: NULL\n\t\tstatus: OK\nEND\",\"timestamp\":1452525329030,\"message\":\"NaviPro method Logoutcalled.\",\"tag_1\":\"NaviPro\",\"attribute_end\":\"Mon Jan 11 16:15:29 CET 2016\",\"attribute_sessionId\":\"\",\"age\":0,\"tag_0\":\"Logout\",\"instanceid\":\"Logout\",\"attribute_address\":\""}"

Logstash 配置:

input {
  syslog {
    port => 1514
  }
  tcp {
    port => 3333
  }
}

filter {
  if [type] == "docker" {
    json {
      source => "message"
    }
    mutate {
      rename => [ "log", "message" ]
    }
    date {
      match => [ "time", "ISO8601" ]
    }
  }
}

output {
  elasticsearch {
    hosts => "elasticsearch:9200"
  }
}

我想将 Elastic 中的数据设为 JSON,这样我就可以在 Kibana 中过滤字段。

编辑:

如果我尝试将配置更改为:

input {
  tcp {
    port => 3333
    codec => json
  }
}

Logstash 拒绝启动并在日志中显示此行:

logstash_1 | {:timestamp=>"2016-01-13T10:13:58.583000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}

我创建了一个简单的测试,您需要添加到您的 logstash 配置的是 codec => json。默认值为 "line" 并将转义字符串中的字符。

input {
  tcp {
    port => 3333
    codec => json
  }
}

output {
  stdout { 
    codec => rubydebug 
  }

  elasticsearch {
    hosts => "elasticsearch:9200"
  }
}

问题出在不正确的 Logstash 配置中。 Logstash 运行 但未通过过滤器发送任何内容。这是正确的配置:

input {
  tcp {
    port => 3333
    type => "java"
  }
}

filter {
  if [type] == "java" {
    json {
      source => "message"
    }
  }
}

output {
  elasticsearch {
    hosts => "elasticsearch:9200"
  }
}