如何创建时间戳增量的直方图?
How can I create a histogram of time stamp deltas?
我们正在 ES 中存储小文档,这些文档表示对象的一系列事件。每个事件都有一个 date/time 标记。我们需要分析一段时间内所有对象的事件之间的时间。
例如,假设这些事件 json 文档:
{ "object":"one", "event":"start", "datetime":"2016-02-09 11:23:01" }
{ "object":"one", "event":"stop", "datetime":"2016-02-09 11:25:01" }
{ "object":"two", "event":"start", "datetime":"2016-01-02 11:23:01" }
{ "object":"two", "event":"stop", "datetime":"2016-01-02 11:24:01" }
我们想要从中得到的是绘制两个结果时间戳增量(从开始到停止)的直方图:对象一为 2 分钟/120 秒,对象二为 1 分钟/60 秒。
最终我们想要监控开始和停止事件之间的时间,但这需要我们计算这些事件之间的时间然后聚合它们或将它们提供给 Kibana UI 进行聚合/绘制。理想情况下,我们希望将结果直接提供给 Kibana,这样我们就可以避免创建任何自定义 UI.
提前感谢您的任何想法或建议。
既然您愿意使用 Logstash,那么有一种方法可以使用 aggregate
filter
请注意,这是一个需要先安装的社区插件。 (即默认情况下它不随 Logstash 一起提供)
aggregate
过滤器的主要思想是合并两个 "related" 日志行。您可以配置插件,使其知道 "related" 的含义。在您的情况下,"related" 意味着两个事件必须共享相同的 object
名称(即 one
或 two
),然后第一个事件有其 event
具有 start
值的字段,第二个事件的 event
字段具有 stop
值。
当过滤器遇到 start
事件时,它会将该事件的 datetime
字段存储在内部映射中。当它遇到 stop
事件时,它会计算两个日期时间之间的时间差,并将持续时间(以秒为单位)存储在新的 duration
字段中。
input {
...
}
filter {
...other filters
if [event] == "start" {
aggregate {
task_id => "%{object}"
code => "map['start'] = event['datetime']"
map_action => "create"
}
} else if [event] == "stop" {
aggregate {
task_id => "%{object}"
code => "map['duration'] = event['datetime'] - map['start']"
end_of_task => true
timeout => 120
}
}
}
output {
elasticsearch {
...
}
}
请注意,您可以调整 timeout
值(此处为 120 秒)以更好地满足您的需求。当超时已经过去并且还没有发生停止事件时,现有的开始事件将被丢弃。
我们正在 ES 中存储小文档,这些文档表示对象的一系列事件。每个事件都有一个 date/time 标记。我们需要分析一段时间内所有对象的事件之间的时间。
例如,假设这些事件 json 文档:
{ "object":"one", "event":"start", "datetime":"2016-02-09 11:23:01" }
{ "object":"one", "event":"stop", "datetime":"2016-02-09 11:25:01" }
{ "object":"two", "event":"start", "datetime":"2016-01-02 11:23:01" }
{ "object":"two", "event":"stop", "datetime":"2016-01-02 11:24:01" }
我们想要从中得到的是绘制两个结果时间戳增量(从开始到停止)的直方图:对象一为 2 分钟/120 秒,对象二为 1 分钟/60 秒。
最终我们想要监控开始和停止事件之间的时间,但这需要我们计算这些事件之间的时间然后聚合它们或将它们提供给 Kibana UI 进行聚合/绘制。理想情况下,我们希望将结果直接提供给 Kibana,这样我们就可以避免创建任何自定义 UI.
提前感谢您的任何想法或建议。
既然您愿意使用 Logstash,那么有一种方法可以使用 aggregate
filter
请注意,这是一个需要先安装的社区插件。 (即默认情况下它不随 Logstash 一起提供)
aggregate
过滤器的主要思想是合并两个 "related" 日志行。您可以配置插件,使其知道 "related" 的含义。在您的情况下,"related" 意味着两个事件必须共享相同的 object
名称(即 one
或 two
),然后第一个事件有其 event
具有 start
值的字段,第二个事件的 event
字段具有 stop
值。
当过滤器遇到 start
事件时,它会将该事件的 datetime
字段存储在内部映射中。当它遇到 stop
事件时,它会计算两个日期时间之间的时间差,并将持续时间(以秒为单位)存储在新的 duration
字段中。
input {
...
}
filter {
...other filters
if [event] == "start" {
aggregate {
task_id => "%{object}"
code => "map['start'] = event['datetime']"
map_action => "create"
}
} else if [event] == "stop" {
aggregate {
task_id => "%{object}"
code => "map['duration'] = event['datetime'] - map['start']"
end_of_task => true
timeout => 120
}
}
}
output {
elasticsearch {
...
}
}
请注意,您可以调整 timeout
值(此处为 120 秒)以更好地满足您的需求。当超时已经过去并且还没有发生停止事件时,现有的开始事件将被丢弃。