Elasticsearch - 计算时间戳之间的延迟
Elasticsearch - Calculate Delay between Timestamps
如何在没有 logstash 但使用 script_fields 的情况下计算时间戳之间的延迟?
例如对于此文档:
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03"
}
}
我想要一个名为“time_taken”的新字段,因此预期的文档应如下所示:
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01",
"time_taken": "1"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03",
"time_taken": "2"
}
}
提供的答案灵感来自 Painless example in Transforms。
解决方案使用Transforms API and it has some limitations I recommend you to check them and see if it's fine for your use-case Transform limitation.
我首先为提供的示例创建了一个映射:
PUT myindex
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keywords": {
"type": "keyword"
}
}
},
"timestamp": {
"type": "date"
}
}
}
}
并插入一些文档:
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:48:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:50:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:53:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:35:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:36:11Z"
}
使用转换 API 我们可以为每个聚合计算每个术语的时间长度:
POST _transform/_preview
{
"source": {
"index": "myindex"
},
"dest": {
"index": "destindex"
},
"pivot": {
"group_by": {
"name": {
"terms": {
"field": "name.keywords"
}
}
},
"aggregations": {
"latest_value": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest)
{state.timestamp_latest = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest))
{timestamp_latest = s.timestamp_latest;}}
return timestamp_latest
"""
}
},
"first_value": {
"scripted_metric": {
"init_script": "state.timestamp_first = 999999999999999L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date < state.timestamp_first)
{state.timestamp_first = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_first = 999999999999999L;
for (s in states) {if (s.timestamp_first < (timestamp_first))
{timestamp_first = s.timestamp_first;}}
return timestamp_first
"""
}
},
"time_length": {
"bucket_script": {
"buckets_path": {
"min": "first_value.value",
"max": "latest_value.value"
},
"script": "(params.max - params.min)/1000"
}
}
}
}
}
输出结果如下:
{
"preview" : [
{
"time_length" : 300.0,
"name" : "test1",
"first_value" : 1643312891000,
"latest_value" : 1643313191000
},
{
"time_length" : 60.0,
"name" : "test2",
"first_value" : 1643312111000,
"latest_value" : 1643312171000
}
],
"generated_dest_index" : {
"mappings" : {
"_meta" : {
"_transform" : {
"transform" : "transform-preview",
"version" : {
"created" : "7.15.1"
},
"creation_date_in_millis" : 1643400080594
},
"created_by" : "transform"
},
"properties" : {
"name" : {
"type" : "keyword"
}
}
},
"settings" : {
"index" : {
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1"
}
},
"aliases" : { }
}
}
脚本在做什么?
如您所见,我们正在字段 name.keywords
上创建术语聚合。我们使用了包含 4 个步骤的脚本化指标聚合:
- init_script: 启动一个状态,这是一个 space 初始化变量的地方,它们的范围对于所有分片都是全局的
- map_script: 这一步为每个文档执行代码,意味着您可以对文档进行迭代或进行复杂计算,就像您在 high-level 编程语言,如 python 或 java(避免进行大量计算,否则会减慢聚合速度)
- combine_script: 这里我们告诉 elasticsearch return 每个分片的状态
- reduce_script: 这是我们迭代上一步(又名
combine script
)每个分片的结果以计算 的最后一步=64=] 每个聚合的时间戳。
最后,在bucket script
中,我们计算给定first_value
和latest_value
的差异,我们除以1000
,因为时间戳字段存储在epoch millis . time_length
单位是秒。
有关脚本化指标聚合的更多信息:Scripted metrics。
如何在没有 logstash 但使用 script_fields 的情况下计算时间戳之间的延迟?
例如对于此文档:
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03"
}
}
我想要一个名为“time_taken”的新字段,因此预期的文档应如下所示:
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01",
"time_taken": "1"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03",
"time_taken": "2"
}
}
提供的答案灵感来自 Painless example in Transforms。
解决方案使用Transforms API and it has some limitations I recommend you to check them and see if it's fine for your use-case Transform limitation.
我首先为提供的示例创建了一个映射:
PUT myindex
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keywords": {
"type": "keyword"
}
}
},
"timestamp": {
"type": "date"
}
}
}
}
并插入一些文档:
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:48:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:50:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:53:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:35:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:36:11Z"
}
使用转换 API 我们可以为每个聚合计算每个术语的时间长度:
POST _transform/_preview
{
"source": {
"index": "myindex"
},
"dest": {
"index": "destindex"
},
"pivot": {
"group_by": {
"name": {
"terms": {
"field": "name.keywords"
}
}
},
"aggregations": {
"latest_value": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest)
{state.timestamp_latest = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest))
{timestamp_latest = s.timestamp_latest;}}
return timestamp_latest
"""
}
},
"first_value": {
"scripted_metric": {
"init_script": "state.timestamp_first = 999999999999999L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date < state.timestamp_first)
{state.timestamp_first = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_first = 999999999999999L;
for (s in states) {if (s.timestamp_first < (timestamp_first))
{timestamp_first = s.timestamp_first;}}
return timestamp_first
"""
}
},
"time_length": {
"bucket_script": {
"buckets_path": {
"min": "first_value.value",
"max": "latest_value.value"
},
"script": "(params.max - params.min)/1000"
}
}
}
}
}
输出结果如下:
{
"preview" : [
{
"time_length" : 300.0,
"name" : "test1",
"first_value" : 1643312891000,
"latest_value" : 1643313191000
},
{
"time_length" : 60.0,
"name" : "test2",
"first_value" : 1643312111000,
"latest_value" : 1643312171000
}
],
"generated_dest_index" : {
"mappings" : {
"_meta" : {
"_transform" : {
"transform" : "transform-preview",
"version" : {
"created" : "7.15.1"
},
"creation_date_in_millis" : 1643400080594
},
"created_by" : "transform"
},
"properties" : {
"name" : {
"type" : "keyword"
}
}
},
"settings" : {
"index" : {
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1"
}
},
"aliases" : { }
}
}
脚本在做什么?
如您所见,我们正在字段 name.keywords
上创建术语聚合。我们使用了包含 4 个步骤的脚本化指标聚合:
- init_script: 启动一个状态,这是一个 space 初始化变量的地方,它们的范围对于所有分片都是全局的
- map_script: 这一步为每个文档执行代码,意味着您可以对文档进行迭代或进行复杂计算,就像您在 high-level 编程语言,如 python 或 java(避免进行大量计算,否则会减慢聚合速度)
- combine_script: 这里我们告诉 elasticsearch return 每个分片的状态
- reduce_script: 这是我们迭代上一步(又名
combine script
)每个分片的结果以计算 的最后一步=64=] 每个聚合的时间戳。
最后,在bucket script
中,我们计算给定first_value
和latest_value
的差异,我们除以1000
,因为时间戳字段存储在epoch millis . time_length
单位是秒。
有关脚本化指标聚合的更多信息:Scripted metrics。