日志分析,根据一条日志消息查询日志
Log analysis to query logs based on a log message
我有一个 Java 应用程序,它以
格式输出日志
时间戳UUID1一些信息
时间戳 UUID1 更多信息
时间戳 UUID1 x = 1
timestamp UUID2 一些信息
时间戳 UUID2 更多信息
时间戳 UUID2 x = 2
timestamp UUID3 一些信息
时间戳 UUID3 更多信息
时间戳 UUID3 x = 1
我想使用 Elsatic Search、LogStash 和 Kibana 实现一个日志分析框架。是否可以只根据X值获取日志?
例如:-
如果我查询 X = 1,我应该只会得到以下日志。
时间戳UUID1一些信息
时间戳 UUID1 更多信息
时间戳 UUID1 x = 1
timestamp UUID3 一些信息
时间戳 UUID3 更多信息
时间戳 UUID3 x = 1
如果我查询 X = 2,我应该只会得到以下日志。
时间戳UUID2一些信息
时间戳 UUID2 更多信息
时间戳 UUID2 x = 2
日志消息格式由我控制。如果不能直接查询,我也可以更改消息格式。
更新 1:
我再具体一点。
以下是我的日志语句。
MDC.put("uuid", UUID.randomUUID().toString());
logger.info("Assigning value to the variable : {}", name);
this.setVal(value.getVal());
logger.info("{} = {}", name, value.getVal());
logger.info("Assigned value {} to the variable : {}", value.getVal(),
name);
MDC.clear();
我使用 UDP 在 Logstash 中接收日志语句。我收到了这样的消息。
{
"@timestamp" => "2015-04-01T10:23:37.846+05:30",
"@version" => 1,
"message" => "Assigning value to the variable : X",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-1",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.846+05:30",
"@version" => 1,
"message" => "Assigning value to the variable : Y",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-2",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.862+05:30",
"@version" => 1,
"message" => "Y = 1",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-2",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.863+05:30",
"@version" => 1,
"message" => "X = 1",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-1",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.863+05:30",
"@version" => 1,
"message" => "Assigned value 1 to the variable : X",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-1",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.863+05:30",
"@version" => 1,
"message" => "Assigned value 1 to the variable : Y",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-2",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
"host" => "127.0.0.1"
}
有2个UUID
"d5513e4c-de3b-4144-87e4-87b077ac8056" 对于 "Y = 1"
"ab17b842-8348-4474-98e4-8bc2b8dd6781" 对于 "X = 1"
每个 UUID 还有另外两条消息。我想把它们合并成一个事件。
我不确定如何为这种情况编写多行过滤器。
filter {
multiline {
pattern => "."
what => "previous"
stream_identity => "%{uuid}"
}
}
"pattern" 和 "what" 似乎是必填字段。我应该为这些字段提供什么。如何使用 Stream Identity?
请指出正确的方向。
谢谢,
保罗
如果 X 是某个唯一值,使用 kibana 过滤器应该是可能的,但是对于所示格式的日志,您需要使用多行过滤器将条目连接在一起。
有了它,您可能可以使用类似
的查询
message: "X=1"
您需要合并您的消息(请参阅支持 stream_identity 的多行{}过滤器),然后常规查询会 return 适当的消息。
我有一个 Java 应用程序,它以
格式输出日志时间戳UUID1一些信息
时间戳 UUID1 更多信息
时间戳 UUID1 x = 1
timestamp UUID2 一些信息
时间戳 UUID2 更多信息
时间戳 UUID2 x = 2
timestamp UUID3 一些信息
时间戳 UUID3 更多信息
时间戳 UUID3 x = 1
我想使用 Elsatic Search、LogStash 和 Kibana 实现一个日志分析框架。是否可以只根据X值获取日志?
例如:-
如果我查询 X = 1,我应该只会得到以下日志。
时间戳UUID1一些信息
时间戳 UUID1 更多信息
时间戳 UUID1 x = 1
timestamp UUID3 一些信息
时间戳 UUID3 更多信息
时间戳 UUID3 x = 1
如果我查询 X = 2,我应该只会得到以下日志。
时间戳UUID2一些信息
时间戳 UUID2 更多信息
时间戳 UUID2 x = 2
日志消息格式由我控制。如果不能直接查询,我也可以更改消息格式。
更新 1:
我再具体一点。
以下是我的日志语句。
MDC.put("uuid", UUID.randomUUID().toString());
logger.info("Assigning value to the variable : {}", name);
this.setVal(value.getVal());
logger.info("{} = {}", name, value.getVal());
logger.info("Assigned value {} to the variable : {}", value.getVal(),
name);
MDC.clear();
我使用 UDP 在 Logstash 中接收日志语句。我收到了这样的消息。
{
"@timestamp" => "2015-04-01T10:23:37.846+05:30",
"@version" => 1,
"message" => "Assigning value to the variable : X",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-1",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.846+05:30",
"@version" => 1,
"message" => "Assigning value to the variable : Y",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-2",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.862+05:30",
"@version" => 1,
"message" => "Y = 1",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-2",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.863+05:30",
"@version" => 1,
"message" => "X = 1",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-1",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.863+05:30",
"@version" => 1,
"message" => "Assigned value 1 to the variable : X",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-1",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
"host" => "127.0.0.1"
}
{
"@timestamp" => "2015-04-01T10:23:37.863+05:30",
"@version" => 1,
"message" => "Assigned value 1 to the variable : Y",
"logger_name" => "com.example.logstash.Variable",
"thread_name" => "pool-1-thread-2",
"level" => "INFO",
"level_value" => 20000,
"HOSTNAME" => "pnibinkj-W7-1",
"uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
"host" => "127.0.0.1"
}
有2个UUID
"d5513e4c-de3b-4144-87e4-87b077ac8056" 对于 "Y = 1"
"ab17b842-8348-4474-98e4-8bc2b8dd6781" 对于 "X = 1"
每个 UUID 还有另外两条消息。我想把它们合并成一个事件。
我不确定如何为这种情况编写多行过滤器。
filter {
multiline {
pattern => "."
what => "previous"
stream_identity => "%{uuid}"
}
}
"pattern" 和 "what" 似乎是必填字段。我应该为这些字段提供什么。如何使用 Stream Identity?
请指出正确的方向。
谢谢, 保罗
如果 X 是某个唯一值,使用 kibana 过滤器应该是可能的,但是对于所示格式的日志,您需要使用多行过滤器将条目连接在一起。
有了它,您可能可以使用类似
的查询message: "X=1"
您需要合并您的消息(请参阅支持 stream_identity 的多行{}过滤器),然后常规查询会 return 适当的消息。