日志分析,根据一条日志消息查询日志

Log analysis to query logs based on a log message

我有一个 Java 应用程序,它以

格式输出日志

时间戳UUID1一些信息
时间戳 UUID1 更多信息
时间戳 UUID1 x = 1

timestamp UUID2 一些信息
时间戳 UUID2 更多信息
时间戳 UUID2 x = 2

timestamp UUID3 一些信息
时间戳 UUID3 更多信息
时间戳 UUID3 x = 1

我想使用 Elsatic Search、LogStash 和 Kibana 实现一个日志分析框架。是否可以只根据X值获取日志?

例如:-

如果我查询 X = 1,我应该只会得到以下日志。

时间戳UUID1一些信息
时间戳 UUID1 更多信息
时间戳 UUID1 x = 1
timestamp UUID3 一些信息
时间戳 UUID3 更多信息
时间戳 UUID3 x = 1

如果我查询 X = 2,我应该只会得到以下日志。

时间戳UUID2一些信息
时间戳 UUID2 更多信息
时间戳 UUID2 x = 2

日志消息格式由我控制。如果不能直接查询,我也可以更改消息格式。

更新 1:

我再具体一点。

以下是我的日志语句。

    MDC.put("uuid", UUID.randomUUID().toString());
    logger.info("Assigning value to the variable : {}", name);
    this.setVal(value.getVal());
    logger.info("{} = {}", name, value.getVal());
    logger.info("Assigned value {} to the variable : {}", value.getVal(),
            name);
    MDC.clear();

我使用 UDP 在 Logstash 中接收日志语句。我收到了这样的消息。

{
     "@timestamp" => "2015-04-01T10:23:37.846+05:30",
       "@version" => 1,
        "message" => "Assigning value to the variable : X",
    "logger_name" => "com.example.logstash.Variable",
    "thread_name" => "pool-1-thread-1",
          "level" => "INFO",
    "level_value" => 20000,
       "HOSTNAME" => "pnibinkj-W7-1",
           "uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
           "host" => "127.0.0.1"
}
{
     "@timestamp" => "2015-04-01T10:23:37.846+05:30",
       "@version" => 1,
        "message" => "Assigning value to the variable : Y",
    "logger_name" => "com.example.logstash.Variable",
    "thread_name" => "pool-1-thread-2",
          "level" => "INFO",
    "level_value" => 20000,
       "HOSTNAME" => "pnibinkj-W7-1",
           "uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
           "host" => "127.0.0.1"
}
{
     "@timestamp" => "2015-04-01T10:23:37.862+05:30",
       "@version" => 1,
        "message" => "Y = 1",
    "logger_name" => "com.example.logstash.Variable",
    "thread_name" => "pool-1-thread-2",
          "level" => "INFO",
    "level_value" => 20000,
       "HOSTNAME" => "pnibinkj-W7-1",
           "uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
           "host" => "127.0.0.1"
}
{
     "@timestamp" => "2015-04-01T10:23:37.863+05:30",
       "@version" => 1,
        "message" => "X = 1",
    "logger_name" => "com.example.logstash.Variable",
    "thread_name" => "pool-1-thread-1",
          "level" => "INFO",
    "level_value" => 20000,
       "HOSTNAME" => "pnibinkj-W7-1",
           "uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
           "host" => "127.0.0.1"
}
{
     "@timestamp" => "2015-04-01T10:23:37.863+05:30",
       "@version" => 1,
        "message" => "Assigned value 1 to the variable : X",
    "logger_name" => "com.example.logstash.Variable",
    "thread_name" => "pool-1-thread-1",
          "level" => "INFO",
    "level_value" => 20000,
       "HOSTNAME" => "pnibinkj-W7-1",
           "uuid" => "ab17b842-8348-4474-98e4-8bc2b8dd6781",
           "host" => "127.0.0.1"
}
{
     "@timestamp" => "2015-04-01T10:23:37.863+05:30",
       "@version" => 1,
        "message" => "Assigned value 1 to the variable : Y",
    "logger_name" => "com.example.logstash.Variable",
    "thread_name" => "pool-1-thread-2",
          "level" => "INFO",
    "level_value" => 20000,
       "HOSTNAME" => "pnibinkj-W7-1",
           "uuid" => "d5513e4c-de3b-4144-87e4-87b077ac8056",
           "host" => "127.0.0.1"
}

有2个UUID

"d5513e4c-de3b-4144-87e4-87b077ac8056" 对于 "Y = 1"
"ab17b842-8348-4474-98e4-8bc2b8dd6781" 对于 "X = 1"

每个 UUID 还有另外两条消息。我想把它们合并成一个事件。

我不确定如何为这种情况编写多行过滤器。

filter {
  multiline {
    pattern => "."
    what => "previous"
    stream_identity => "%{uuid}"
  }
}

"pattern" 和 "what" 似乎是必填字段。我应该为这些字段提供什么。如何使用 Stream Identity?

请指出正确的方向。

谢谢, 保罗

如果 X 是某个唯一值,使用 kibana 过滤器应该是可能的,但是对于所示格式的日志,您需要使用多行过滤器将条目连接在一起。

有了它,您可能可以使用类似

的查询
message: "X=1"

您需要合并您的消息(请参阅支持 stream_identity 的多行{}过滤器),然后常规查询会 return 适当的消息。