Kibana 连接两个独立的事件
Kibana linking two independent events
我将 ELK 配置为离线收集数据,日志文件如下所示:
Info 2015-08-15 09:33:37,522 User 3 connected
Info 2015-08-15 10:03:57,592 User 99 connected
Info 2015-08-15 11:42:37,522 User 99 disconnected
Info 2015-08-15 11:49:12,108 User 3 disconnected
我要找的是时间线上的平均连接时间。
我无法在消息中添加更多信息,特别是我无法在断开消息中添加连接时间。
Elasticsearch 的缺点之一是它不是关系数据库 - 因此交叉引用受到更多限制。有一个很好的博客 post 关于它:Managing Relations inside Elasticsearch
但总而言之 - 没有办法直接查询此类内容。每个事件都是索引中的一个独立文档,没有任何类型的交叉引用。
所以你必须努力做到这一点。在最简单的层面上 - 即查询所有连接事件、查询所有断开连接事件,并使用脚本语言将它们关联起来。
您可以通过 pre-filtering 您的日志使用 grok
过滤器向您的数据库添加字段,从而使这稍微容易一些。
if [type] == "syslog" and [message] =~ /connected/ {
grok {
match => [ "message", "User %{POSINT:userid} %{WORD:conn}" ]
}
}
这将添加一个 userid
和 conn
字段(包含 "connected" 或 "disconnected")。
但是您仍然需要使用您最喜欢的脚本语言手动将查询与数据库提取相关联(因此可以在脚本中执行 'search and filter')。
如果您使用 Logstash 加载 ES,您可以使用 aggregate
过滤器来 assemble 相关的离散日志行。这个想法是注意 long-lasting 事件何时开始(即用户连接),然后在同一用户的 disconnected
事件飞过时结束它:(请注意,您的 grok 模式可能不同,但原则是一样的)
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} %{TIMESTAMP_ISO8601:timestamp} %{WORD:entity} %{INT:userid} %{WORD:status}" ]
}
if [status] == "connected" {
aggregate {
task_id => "%{userid}"
code => "map['started'] = event['timestamp']"
map_action => "create"
}
}
if [status] == "disconnected" {
aggregate {
task_id => "%{userid}"
code => "event['duration'] = event['timestamp'] - map['started']"
map_action => "update"
end_of_task => true
timeout => 86400000
}
}
}
您最终会得到一个名为 duration
(以毫秒为单位)的附加字段,然后您可以使用它在 Kibana 上绘图以显示平均连接时间。
另请注意,我给出了一天的任意超时,这可能适合也可能不适合您的情况。随意玩耍。
我将 ELK 配置为离线收集数据,日志文件如下所示:
Info 2015-08-15 09:33:37,522 User 3 connected
Info 2015-08-15 10:03:57,592 User 99 connected
Info 2015-08-15 11:42:37,522 User 99 disconnected
Info 2015-08-15 11:49:12,108 User 3 disconnected
我要找的是时间线上的平均连接时间。
我无法在消息中添加更多信息,特别是我无法在断开消息中添加连接时间。
Elasticsearch 的缺点之一是它不是关系数据库 - 因此交叉引用受到更多限制。有一个很好的博客 post 关于它:Managing Relations inside Elasticsearch
但总而言之 - 没有办法直接查询此类内容。每个事件都是索引中的一个独立文档,没有任何类型的交叉引用。
所以你必须努力做到这一点。在最简单的层面上 - 即查询所有连接事件、查询所有断开连接事件,并使用脚本语言将它们关联起来。
您可以通过 pre-filtering 您的日志使用 grok
过滤器向您的数据库添加字段,从而使这稍微容易一些。
if [type] == "syslog" and [message] =~ /connected/ {
grok {
match => [ "message", "User %{POSINT:userid} %{WORD:conn}" ]
}
}
这将添加一个 userid
和 conn
字段(包含 "connected" 或 "disconnected")。
但是您仍然需要使用您最喜欢的脚本语言手动将查询与数据库提取相关联(因此可以在脚本中执行 'search and filter')。
如果您使用 Logstash 加载 ES,您可以使用 aggregate
过滤器来 assemble 相关的离散日志行。这个想法是注意 long-lasting 事件何时开始(即用户连接),然后在同一用户的 disconnected
事件飞过时结束它:(请注意,您的 grok 模式可能不同,但原则是一样的)
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} %{TIMESTAMP_ISO8601:timestamp} %{WORD:entity} %{INT:userid} %{WORD:status}" ]
}
if [status] == "connected" {
aggregate {
task_id => "%{userid}"
code => "map['started'] = event['timestamp']"
map_action => "create"
}
}
if [status] == "disconnected" {
aggregate {
task_id => "%{userid}"
code => "event['duration'] = event['timestamp'] - map['started']"
map_action => "update"
end_of_task => true
timeout => 86400000
}
}
}
您最终会得到一个名为 duration
(以毫秒为单位)的附加字段,然后您可以使用它在 Kibana 上绘图以显示平均连接时间。
另请注意,我给出了一天的任意超时,这可能适合也可能不适合您的情况。随意玩耍。