elasticsearch中scripted metrics的指标如何计算和绘制
How to calculate & draw metrics of scripted metrics in elasticsearch
问题描述
我们将来自不同设备的日志文件逐行解析到我们的弹性搜索数据库中。日志文件构建为环形缓冲区,因此它们的大小始终固定为 1000 行。它们可以在需要时手动导出。在弹性搜索中导入和解析后,每个文档代表日志文件的一行,其中包含以下信息:
DeviceID: 12345
FileType: ErrorLog
FileTimestamp: 2022-05-10 01:23:45
LogTimestamp: 2022-05-05 01:23:45
LogMessage: something very important here
现在我想要统计通常由固定数量的行覆盖的时间跨度。因为,根据设备的使用强度,会生成不同数量的日志条目,文件可能涵盖几天到几个月……但由于日志文件被分成单独的行,所以它不是那个微不足道的(我想)。
我的目标是制作一个图表,显示不同日志文件时间跨度的“直方图”...
第一次尝试:可视化库 > 数据 table
我首先在 Visualize library
中创建了一个 Data table
,在那里我能够按如下方式汇总数据:
我添加了 3 个 Buckets --> 所以我的所有行都由它们的原始文件存储:
- 拆分行 DeviceID.keyword
- 拆分行 FileType.keyword
- 拆分行文件时间戳
... and 2 Metrics --> 显示日志文件时间跨度(我找不到创建 max-min
指标的方法,所以我从最大值和最小值的单独指标开始:
- 公制最小 LogTimeStamp
- 指标最大 LogTimeStamp
这导致以下查询:
{
"aggs": {
"2": {
"terms": {
"field": "DeviceID.keyword",
"order": {
"_key": "desc"
},
"size": 100
},
"aggs": {
"3": {
"terms": {
"field": "FileType.keyword",
"order": {
"_key": "desc"
},
"size": 5
},
"aggs": {
"4": {
"terms": {
"field": "FileTimestamp",
"order": {
"_key": "desc"
},
"size": 100
},
"aggs": {
"1": {
"min": {
"field": "LogTimeStamp"
}
},
"5": {
"max": {
"field": "LogTimeStamp"
}
}
}
}
}
}
}
}
},
"size": 0,
...
}
... 这个输出:
DeviceID FileType FileTimestamp Min LogTimestamp Max LogTimestamp
---------------------------------------------------------------------------------------------
12345 ErrorLog 2022-05-10 01:23:45 2022-04-10 01:23:45 2022-05-10 01:23:45
...
到目前为止看起来不错!对于这个例子,预期结果正好是 1 个月。
但我的研究表明,无法在此处添加所需的指标,因此我需要尝试其他方法...
第二次尝试:Vizualize 库 > 自定义可视化 (Vega-Lite)
所以我开始了更多的研究,发现 vega
可能是可行的。我已经能够从那里的第一次尝试转移桶部分,我还添加了一个 scripted metric
自动 calculate the timespan
(而不是最小值和最大值),到目前为止,非常好。请求正文如下所示:
body: {
"aggs": {
"DeviceID": {
"terms": { "field": "DeviceID.keyword" },
"aggs": {
"FileType": {
"terms": { "field": "FileType.keyword" } ,
"aggs": {
"FileTimestamp": {
"terms": { "field": "FileTimestamp" } ,
"aggs": {
"timespan": {
"scripted_metric": {
"init_script": "state.values = [];",
"map_script": "state.values.add(doc['@timestamp'].value);",
"combine_script": "long min = Long.MAX_VALUE; long max = 0; for (t in state.values) { long tms = t.toInstant().toEpochMilli(); if(tms > max) max = tms; if(tms < min) min = tms; } return [max,min];",
"reduce_script": "long min = Long.MAX_VALUE; long max = 0; for (a in states) { if(a[0] > max) max = a[0]; if(a[1] < min) min = a[1]; } return max-min;"
}
}
}
}
}
}
}
}
},
"size": 0,
}
...使用此响应(删除不必要的信息以降低复杂性):
{
"took": 12245,
"timed_out": false,
"_shards": { ... },
"hits": { ... },
"aggregations": {
"DeviceID": {
"buckets": [
{
"key": "12345",
"FileType": {
"buckets": [
{
"key": "ErrorLog",
"FileTimeStamp": {
"buckets": [
{
"key": 1638447972000,
"key_as_string": "2021-12-02T12:26:12.000Z",
"doc_count": 1000,
"timespan": {
"value": 31339243240
}
},
{
"key": 1636023881000,
"key_as_string": "2021-11-04T11:04:41.000Z",
"doc_count": 1000,
"timespan": {
"value": 31339243240
}
}
]
}
},
{
"key": "InfoLog",
"FileTimeStamp": {
"buckets": [
{
"key": 1635773438000,
"key_as_string": "2021-11-01T13:30:38.000Z",
"doc_count": 1000,
"timespan": {
"value": 2793365000
}
},
{
"key": 1636023881000,
"key_as_string": "2021-11-04T11:04:41.000Z",
"doc_count": 1000,
"timespan": {
"value": 2643772000
}
}
]
}
}
]
}
},
{
"key": "12346",
"FileType": {
...
}
},
...
]
}
}
}
是啊,好像有用!现在我有了每个原始日志文件的时间跨度。
问题
现在我被困在:
- 我想平均每个原始日志文件的时间跨度(通过 DeviceID + FileType + FileTimeStamp 的组合识别)以防止导入多个日志文件的设备具有更高的时间跨度重量,比只导入 1 个日志文件的设备。我试图
add another aggregation
的平均值,但我不知道放在哪里,以便使用 scripted_metric
的结果。我最接近的尝试是在 FileTimeStamp 存储桶之后放置一个 avg_bucket
:
要求:
body: {
"aggs": {
"DeviceID": {
"terms": { "field": "DeviceID.keyword" },
"aggs": {
"FileType": {
"terms": { "field": "FileType.keyword" } ,
"aggs": {
"FileTimestamp": {
"terms": { "field": "FileTimestamp" } ,
"aggs": {
"timespan": {
"scripted_metric": {
"init_script": "state.values = [];",
"map_script": "state.values.add(doc['FileTimestamp'].value);",
"combine_script": "long min = Long.MAX_VALUE; long max = 0; for (t in state.values) { long tms = t.toInstant().toEpochMilli(); if(tms > max) max = tms; if(tms < min) min = tms; } return [max,min];",
"reduce_script": "long min = Long.MAX_VALUE; long max = 0; for (a in states) { if(a[0] > max) max = a[0]; if(a[1] < min) min = a[1]; } return max-min;"
}
}
}
},
// new part - start
"avg_timespan": {
"avg_bucket": {
"buckets_path": "FileTimestamp>timespan"
}
}
// new part - end
}
}
}
}
},
"size": 0,
}
但是我收到以下错误:
EsError: buckets_path must reference either a number value or a single value numeric metric aggregation, got: [InternalScriptedMetric] at aggregation [timespan]
这个位置对吗? (但不适用于脚本指标)还是我走错了路?
- 我需要绘制所有这些,但我找不到通过所有桶等的方法。
我阅读了关于扁平化的内容(这可能是个好主意,所以(如果由服务器完成)结果不会那么复杂),但不知道在哪里以及如何进行扁平化转换。
我想象得到的图表是这样的:
- x 轴 = 日志文件时间跨度,其中时间跨度根据给定步长(例如 1 天)“分箱”,因此每个分箱只有条形(1 = 0-1 天,2 = 1-2 天,3 = 2-3 天,等等)而不是日志文件的所有不同时间跨度
- y 轴 = 设备数
- 类型:线条或竖线,按文件类型拆分
例如像这样:
非常感谢任何帮助!提前致谢!
如果你有权限创建一个transform, then the elastic painless example Getting duration by using bucket script可以做你想做的事。它会创建一个新索引,其中所有文档都根据您的需要进行分组。
创建转换:
- 转到堆栈管理 > 转换 > + 创建转换
- select 为 Pivot 配置对象编辑 JSON 配置
- 粘贴并应用下面的 JSON
- 在转换预览
中检查结果是否符合预期
- 填写其余的转换细节 + 保存转换
JSON 配置
{
"group_by": {
"DeviceID": {
"terms": {
"field": "DeviceID.keyword"
}
},
"FileType": {
"terms": {
"field": "FileType.keyword"
}
},
"FileTimestamp": {
"terms": {
"field": "FileTimestamp"
}
}
},
"aggregations": {
"TimeStampStats": {
"stats": {
"field": "@timestamp"
}
},
"TimeSpan": {
"bucket_script": {
"buckets_path": {
"first": "TimeStampStats.min",
"last": "TimeStampStats.max"
},
"script": "params.last - params.first"
}
}
}
}
现在您可以从新索引创建图表,例如使用这些设置:
- 竖线
- 指标:
- Y-axis = "计数"
- 桶:
- X-axis = "时间跨度"
- 拆分系列 =“文件类型”
问题描述
我们将来自不同设备的日志文件逐行解析到我们的弹性搜索数据库中。日志文件构建为环形缓冲区,因此它们的大小始终固定为 1000 行。它们可以在需要时手动导出。在弹性搜索中导入和解析后,每个文档代表日志文件的一行,其中包含以下信息:
DeviceID: 12345
FileType: ErrorLog
FileTimestamp: 2022-05-10 01:23:45
LogTimestamp: 2022-05-05 01:23:45
LogMessage: something very important here
现在我想要统计通常由固定数量的行覆盖的时间跨度。因为,根据设备的使用强度,会生成不同数量的日志条目,文件可能涵盖几天到几个月……但由于日志文件被分成单独的行,所以它不是那个微不足道的(我想)。
我的目标是制作一个图表,显示不同日志文件时间跨度的“直方图”...
第一次尝试:可视化库 > 数据 table
我首先在 Visualize library
中创建了一个 Data table
,在那里我能够按如下方式汇总数据:
我添加了 3 个 Buckets --> 所以我的所有行都由它们的原始文件存储:
- 拆分行 DeviceID.keyword
- 拆分行 FileType.keyword
- 拆分行文件时间戳
... and 2 Metrics --> 显示日志文件时间跨度(我找不到创建 max-min
指标的方法,所以我从最大值和最小值的单独指标开始:
- 公制最小 LogTimeStamp
- 指标最大 LogTimeStamp
这导致以下查询:
{
"aggs": {
"2": {
"terms": {
"field": "DeviceID.keyword",
"order": {
"_key": "desc"
},
"size": 100
},
"aggs": {
"3": {
"terms": {
"field": "FileType.keyword",
"order": {
"_key": "desc"
},
"size": 5
},
"aggs": {
"4": {
"terms": {
"field": "FileTimestamp",
"order": {
"_key": "desc"
},
"size": 100
},
"aggs": {
"1": {
"min": {
"field": "LogTimeStamp"
}
},
"5": {
"max": {
"field": "LogTimeStamp"
}
}
}
}
}
}
}
}
},
"size": 0,
...
}
... 这个输出:
DeviceID FileType FileTimestamp Min LogTimestamp Max LogTimestamp
---------------------------------------------------------------------------------------------
12345 ErrorLog 2022-05-10 01:23:45 2022-04-10 01:23:45 2022-05-10 01:23:45
...
到目前为止看起来不错!对于这个例子,预期结果正好是 1 个月。
但我的研究表明,无法在此处添加所需的指标,因此我需要尝试其他方法...
第二次尝试:Vizualize 库 > 自定义可视化 (Vega-Lite)
所以我开始了更多的研究,发现 vega
可能是可行的。我已经能够从那里的第一次尝试转移桶部分,我还添加了一个 scripted metric
自动 calculate the timespan
(而不是最小值和最大值),到目前为止,非常好。请求正文如下所示:
body: {
"aggs": {
"DeviceID": {
"terms": { "field": "DeviceID.keyword" },
"aggs": {
"FileType": {
"terms": { "field": "FileType.keyword" } ,
"aggs": {
"FileTimestamp": {
"terms": { "field": "FileTimestamp" } ,
"aggs": {
"timespan": {
"scripted_metric": {
"init_script": "state.values = [];",
"map_script": "state.values.add(doc['@timestamp'].value);",
"combine_script": "long min = Long.MAX_VALUE; long max = 0; for (t in state.values) { long tms = t.toInstant().toEpochMilli(); if(tms > max) max = tms; if(tms < min) min = tms; } return [max,min];",
"reduce_script": "long min = Long.MAX_VALUE; long max = 0; for (a in states) { if(a[0] > max) max = a[0]; if(a[1] < min) min = a[1]; } return max-min;"
}
}
}
}
}
}
}
}
},
"size": 0,
}
...使用此响应(删除不必要的信息以降低复杂性):
{
"took": 12245,
"timed_out": false,
"_shards": { ... },
"hits": { ... },
"aggregations": {
"DeviceID": {
"buckets": [
{
"key": "12345",
"FileType": {
"buckets": [
{
"key": "ErrorLog",
"FileTimeStamp": {
"buckets": [
{
"key": 1638447972000,
"key_as_string": "2021-12-02T12:26:12.000Z",
"doc_count": 1000,
"timespan": {
"value": 31339243240
}
},
{
"key": 1636023881000,
"key_as_string": "2021-11-04T11:04:41.000Z",
"doc_count": 1000,
"timespan": {
"value": 31339243240
}
}
]
}
},
{
"key": "InfoLog",
"FileTimeStamp": {
"buckets": [
{
"key": 1635773438000,
"key_as_string": "2021-11-01T13:30:38.000Z",
"doc_count": 1000,
"timespan": {
"value": 2793365000
}
},
{
"key": 1636023881000,
"key_as_string": "2021-11-04T11:04:41.000Z",
"doc_count": 1000,
"timespan": {
"value": 2643772000
}
}
]
}
}
]
}
},
{
"key": "12346",
"FileType": {
...
}
},
...
]
}
}
}
是啊,好像有用!现在我有了每个原始日志文件的时间跨度。
问题
现在我被困在:
- 我想平均每个原始日志文件的时间跨度(通过 DeviceID + FileType + FileTimeStamp 的组合识别)以防止导入多个日志文件的设备具有更高的时间跨度重量,比只导入 1 个日志文件的设备。我试图
add another aggregation
的平均值,但我不知道放在哪里,以便使用scripted_metric
的结果。我最接近的尝试是在 FileTimeStamp 存储桶之后放置一个avg_bucket
:
要求:
body: {
"aggs": {
"DeviceID": {
"terms": { "field": "DeviceID.keyword" },
"aggs": {
"FileType": {
"terms": { "field": "FileType.keyword" } ,
"aggs": {
"FileTimestamp": {
"terms": { "field": "FileTimestamp" } ,
"aggs": {
"timespan": {
"scripted_metric": {
"init_script": "state.values = [];",
"map_script": "state.values.add(doc['FileTimestamp'].value);",
"combine_script": "long min = Long.MAX_VALUE; long max = 0; for (t in state.values) { long tms = t.toInstant().toEpochMilli(); if(tms > max) max = tms; if(tms < min) min = tms; } return [max,min];",
"reduce_script": "long min = Long.MAX_VALUE; long max = 0; for (a in states) { if(a[0] > max) max = a[0]; if(a[1] < min) min = a[1]; } return max-min;"
}
}
}
},
// new part - start
"avg_timespan": {
"avg_bucket": {
"buckets_path": "FileTimestamp>timespan"
}
}
// new part - end
}
}
}
}
},
"size": 0,
}
但是我收到以下错误:
EsError: buckets_path must reference either a number value or a single value numeric metric aggregation, got: [InternalScriptedMetric] at aggregation [timespan]
这个位置对吗? (但不适用于脚本指标)还是我走错了路?
- 我需要绘制所有这些,但我找不到通过所有桶等的方法。 我阅读了关于扁平化的内容(这可能是个好主意,所以(如果由服务器完成)结果不会那么复杂),但不知道在哪里以及如何进行扁平化转换。
我想象得到的图表是这样的:
- x 轴 = 日志文件时间跨度,其中时间跨度根据给定步长(例如 1 天)“分箱”,因此每个分箱只有条形(1 = 0-1 天,2 = 1-2 天,3 = 2-3 天,等等)而不是日志文件的所有不同时间跨度
- y 轴 = 设备数
- 类型:线条或竖线,按文件类型拆分
例如像这样:
非常感谢任何帮助!提前致谢!
如果你有权限创建一个transform, then the elastic painless example Getting duration by using bucket script可以做你想做的事。它会创建一个新索引,其中所有文档都根据您的需要进行分组。
创建转换:
- 转到堆栈管理 > 转换 > + 创建转换
- select 为 Pivot 配置对象编辑 JSON 配置
- 粘贴并应用下面的 JSON
- 在转换预览 中检查结果是否符合预期
- 填写其余的转换细节 + 保存转换
JSON 配置
{
"group_by": {
"DeviceID": {
"terms": {
"field": "DeviceID.keyword"
}
},
"FileType": {
"terms": {
"field": "FileType.keyword"
}
},
"FileTimestamp": {
"terms": {
"field": "FileTimestamp"
}
}
},
"aggregations": {
"TimeStampStats": {
"stats": {
"field": "@timestamp"
}
},
"TimeSpan": {
"bucket_script": {
"buckets_path": {
"first": "TimeStampStats.min",
"last": "TimeStampStats.max"
},
"script": "params.last - params.first"
}
}
}
}
现在您可以从新索引创建图表,例如使用这些设置:
- 竖线
- 指标:
- Y-axis = "计数"
- 桶:
- X-axis = "时间跨度"
- 拆分系列 =“文件类型”