JSON 在 Logstash 中转义
JSON escaping in Logstash
我正在通过 TCP 从 Java 服务器发送一些 JSON 数据到 Logstash(Logstash 将它们发送到 Elasticsearch),这些 JSON 数据似乎在 Elastic 中被转义了。
Java连载:
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", event.getAge());
for (Entry<String, Serializable> attribute : event.getAttributes().entrySet()) {
jsonMap.put("attribute_" + attribute.getKey(), attribute.getValue());
}
jsonMap.put("message", event.getMessage());
jsonMap.put("cause", event.getCause());
jsonMap.put("timestamp", event.getTimestamp());
jsonMap.put("eventid", event.getEventId());
jsonMap.put("instanceid", event.getInstanceId());
jsonMap.put("origin", event.getOrigin());
jsonMap.put("severity", event.getSeverity());
jsonMap.put("durability", event.getDurability());
jsonMap.put("detail", event.getDetail());
int i = 0;
for (String tag : event.getTags()) {
jsonMap.put("tag_" + String.valueOf(i), tag);
i++;
}
return new JSONObject(jsonMap).toString();
Java 套接字:
try (Socket clientSocket = new Socket(url, port);
OutputStreamWriter out = new OutputStreamWriter(
clientSocket.getOutputStream(), "UTF-8")) {
out.write(content.toString());
out.flush();
}
Elastic 中的示例数据:
"message": "{\"detail\":null,\"cause\":null,\"attribute_start\":\"Mon Jan 11 16:15:28 CET 2016\",\"durability\":\"MOMENTARY\",\"attribute_login\":\"\",\"origin\":\"fortuna.ws.navipro\",\"severity\":\"ERROR\",\"attribute_type\":null,\"attribute_methodName\":\"Logout\",\"eventid\":\"ws.navipro.call\",\"attribute_call\":\"[57,7256538816272415441,,OK]{0 connections} CZ() Calling method 'Logout' at 1452525329029(Mon Jan 11 16:15:28 CET 2016-Mon Jan 11 16:15:29 CET 2016; roundtrip=36ms):\n\tRequest\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ClientLogoutRequest\n\t\tkeep: true\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.RequestBody\n\t\tcountry: CZ\n\t\tsessionCC: NULL\n\t\tsessionID:\n\t\tsessionIP:\n\t\tdebug: NULL\n\t\tns: NULL\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.RequestCorpus\n\tResponse\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ClientLogoutResponse\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ResponseBody\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ResponseCorpus\n\t\tmessage: \n\t\t[1] \n\t\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.Message\n\t\t\tparam: \n\t\t\t[1] \n\t\t\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.Message$Param\n\t\t\t\tindex: 0\n\t\t\t\ttype: NULL\n\t\t\t\tvalue: 3\n\t\t\tid: 104\n\t\t\tseverity: NOTIFICATION\n\t\t\tlink: NULL\n\t\tentryLink: NULL\n\t\thint: NULL\n\t\thintType: NULL\n\t\tstatus: OK\nEND\",\"timestamp\":1452525329030,\"message\":\"NaviPro method Logoutcalled.\",\"tag_1\":\"NaviPro\",\"attribute_end\":\"Mon Jan 11 16:15:29 CET 2016\",\"attribute_sessionId\":\"\",\"age\":0,\"tag_0\":\"Logout\",\"instanceid\":\"Logout\",\"attribute_address\":\""}"
Logstash 配置:
input {
syslog {
port => 1514
}
tcp {
port => 3333
}
}
filter {
if [type] == "docker" {
json {
source => "message"
}
mutate {
rename => [ "log", "message" ]
}
date {
match => [ "time", "ISO8601" ]
}
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
我想将 Elastic 中的数据设为 JSON,这样我就可以在 Kibana 中过滤字段。
编辑:
如果我尝试将配置更改为:
input {
tcp {
port => 3333
codec => json
}
}
Logstash 拒绝启动并在日志中显示此行:
logstash_1 | {:timestamp=>"2016-01-13T10:13:58.583000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
我创建了一个简单的测试,您需要添加到您的 logstash 配置的是 codec => json
。默认值为 "line"
并将转义字符串中的字符。
input {
tcp {
port => 3333
codec => json
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "elasticsearch:9200"
}
}
问题出在不正确的 Logstash 配置中。 Logstash 运行 但未通过过滤器发送任何内容。这是正确的配置:
input {
tcp {
port => 3333
type => "java"
}
}
filter {
if [type] == "java" {
json {
source => "message"
}
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
我正在通过 TCP 从 Java 服务器发送一些 JSON 数据到 Logstash(Logstash 将它们发送到 Elasticsearch),这些 JSON 数据似乎在 Elastic 中被转义了。
Java连载:
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", event.getAge());
for (Entry<String, Serializable> attribute : event.getAttributes().entrySet()) {
jsonMap.put("attribute_" + attribute.getKey(), attribute.getValue());
}
jsonMap.put("message", event.getMessage());
jsonMap.put("cause", event.getCause());
jsonMap.put("timestamp", event.getTimestamp());
jsonMap.put("eventid", event.getEventId());
jsonMap.put("instanceid", event.getInstanceId());
jsonMap.put("origin", event.getOrigin());
jsonMap.put("severity", event.getSeverity());
jsonMap.put("durability", event.getDurability());
jsonMap.put("detail", event.getDetail());
int i = 0;
for (String tag : event.getTags()) {
jsonMap.put("tag_" + String.valueOf(i), tag);
i++;
}
return new JSONObject(jsonMap).toString();
Java 套接字:
try (Socket clientSocket = new Socket(url, port);
OutputStreamWriter out = new OutputStreamWriter(
clientSocket.getOutputStream(), "UTF-8")) {
out.write(content.toString());
out.flush();
}
Elastic 中的示例数据:
"message": "{\"detail\":null,\"cause\":null,\"attribute_start\":\"Mon Jan 11 16:15:28 CET 2016\",\"durability\":\"MOMENTARY\",\"attribute_login\":\"\",\"origin\":\"fortuna.ws.navipro\",\"severity\":\"ERROR\",\"attribute_type\":null,\"attribute_methodName\":\"Logout\",\"eventid\":\"ws.navipro.call\",\"attribute_call\":\"[57,7256538816272415441,,OK]{0 connections} CZ() Calling method 'Logout' at 1452525329029(Mon Jan 11 16:15:28 CET 2016-Mon Jan 11 16:15:29 CET 2016; roundtrip=36ms):\n\tRequest\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ClientLogoutRequest\n\t\tkeep: true\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.RequestBody\n\t\tcountry: CZ\n\t\tsessionCC: NULL\n\t\tsessionID:\n\t\tsessionIP:\n\t\tdebug: NULL\n\t\tns: NULL\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.RequestCorpus\n\tResponse\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ClientLogoutResponse\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ResponseBody\n\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.ResponseCorpus\n\t\tmessage: \n\t\t[1] \n\t\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.Message\n\t\t\tparam: \n\t\t\t[1] \n\t\t\t\tCLASS com.etnetera.projects.jnp.fortuna.navipro.ws.Message$Param\n\t\t\t\tindex: 0\n\t\t\t\ttype: NULL\n\t\t\t\tvalue: 3\n\t\t\tid: 104\n\t\t\tseverity: NOTIFICATION\n\t\t\tlink: NULL\n\t\tentryLink: NULL\n\t\thint: NULL\n\t\thintType: NULL\n\t\tstatus: OK\nEND\",\"timestamp\":1452525329030,\"message\":\"NaviPro method Logoutcalled.\",\"tag_1\":\"NaviPro\",\"attribute_end\":\"Mon Jan 11 16:15:29 CET 2016\",\"attribute_sessionId\":\"\",\"age\":0,\"tag_0\":\"Logout\",\"instanceid\":\"Logout\",\"attribute_address\":\""}"
Logstash 配置:
input {
syslog {
port => 1514
}
tcp {
port => 3333
}
}
filter {
if [type] == "docker" {
json {
source => "message"
}
mutate {
rename => [ "log", "message" ]
}
date {
match => [ "time", "ISO8601" ]
}
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
我想将 Elastic 中的数据设为 JSON,这样我就可以在 Kibana 中过滤字段。
编辑:
如果我尝试将配置更改为:
input {
tcp {
port => 3333
codec => json
}
}
Logstash 拒绝启动并在日志中显示此行:
logstash_1 | {:timestamp=>"2016-01-13T10:13:58.583000+0000", :message=>"SIGTERM received. Shutting down the pipeline.", :level=>:warn}
我创建了一个简单的测试,您需要添加到您的 logstash 配置的是 codec => json
。默认值为 "line"
并将转义字符串中的字符。
input {
tcp {
port => 3333
codec => json
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "elasticsearch:9200"
}
}
问题出在不正确的 Logstash 配置中。 Logstash 运行 但未通过过滤器发送任何内容。这是正确的配置:
input {
tcp {
port => 3333
type => "java"
}
}
filter {
if [type] == "java" {
json {
source => "message"
}
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}