Elasticsearch - 计算时间戳之间的延迟

Elasticsearch - Calculate Delay between Timestamps

如何在没有 logstash 但使用 script_fields 的情况下计算时间戳之间的延迟?

例如对于此文档:

{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:00"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:01"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:03"
 }
}

我想要一个名为“time_taken”的新字段,因此预期的文档应如下所示:

{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:00"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:01",
  "time_taken": "1"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:03",
  "time_taken": "2"
 }
}

提供的答案灵感来自 Painless example in Transforms

解决方案使用Transforms API and it has some limitations I recommend you to check them and see if it's fine for your use-case Transform limitation.

我首先为提供的示例创建了一个映射:

PUT myindex
{
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keywords": {
            "type": "keyword"
          }
        }
      },
      "timestamp": {
        "type": "date"
      }
    }
  }
}

并插入一些文档:

POST myindex/_doc
{
  "name": "test1",
  "timestamp":"2022-01-27T19:48:11Z"
}
POST myindex/_doc
{
  "name": "test1",
  "timestamp":"2022-01-27T19:50:11Z"
}
POST myindex/_doc
{
  "name": "test1",
  "timestamp":"2022-01-27T19:53:11Z"
}
POST myindex/_doc
{
  "name": "test2",
  "timestamp":"2022-01-27T19:35:11Z"
}
POST myindex/_doc
{
  "name": "test2",
  "timestamp":"2022-01-27T19:36:11Z"
}

使用转换 API 我们可以为每个聚合计算每个术语的时间长度:

POST _transform/_preview
{
  "source": {
    "index": "myindex"
  },
  "dest": {
    "index": "destindex"
  },
  "pivot": {
    "group_by": {
      "name": {
        "terms": {
          "field": "name.keywords"
        }
      }
    },
    "aggregations": {
      "latest_value": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L;",
          "map_script": """
          def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
          if (current_date > state.timestamp_latest)
          {state.timestamp_latest = current_date;}
        """,
          "combine_script": "return state",
          "reduce_script": """
          def last_doc = '';
          def timestamp_latest = 0L;
          for (s in states) {if (s.timestamp_latest > (timestamp_latest))
          {timestamp_latest = s.timestamp_latest;}}
          return timestamp_latest
        """
        }
      },
      "first_value": {
         "scripted_metric": {
          "init_script": "state.timestamp_first = 999999999999999L;",
          "map_script": """
          def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
          if (current_date < state.timestamp_first)
          {state.timestamp_first = current_date;}
        """,
          "combine_script": "return state",
          "reduce_script": """
          def last_doc = '';
          def timestamp_first = 999999999999999L;
          for (s in states) {if (s.timestamp_first < (timestamp_first))
          {timestamp_first = s.timestamp_first;}}
          return timestamp_first
        """
        }
      },
      "time_length": {
        "bucket_script": {
          "buckets_path": {
            "min": "first_value.value",
            "max": "latest_value.value"
          },
          "script": "(params.max - params.min)/1000"
        }
      }
    }
  }
}

输出结果如下:

{
  "preview" : [
    {
      "time_length" : 300.0,
      "name" : "test1",
      "first_value" : 1643312891000,
      "latest_value" : 1643313191000
    },
    {
      "time_length" : 60.0,
      "name" : "test2",
      "first_value" : 1643312111000,
      "latest_value" : 1643312171000
    }
  ],
  "generated_dest_index" : {
    "mappings" : {
      "_meta" : {
        "_transform" : {
          "transform" : "transform-preview",
          "version" : {
            "created" : "7.15.1"
          },
          "creation_date_in_millis" : 1643400080594
        },
        "created_by" : "transform"
      },
      "properties" : {
        "name" : {
          "type" : "keyword"
        }
      }
    },
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "auto_expand_replicas" : "0-1"
      }
    },
    "aliases" : { }
  }
}

脚本在做什么?

如您所见,我们正在字段 name.keywords 上创建术语聚合。我们使用了包含 4 个步骤的脚本化指标聚合:

  • init_script: 启动一个状态,这是一个 space 初始化变量的地方,它们的范围对于所有分片都是全局的
  • map_script: 这一步为每个文档执行代码,意味着您可以对文档进行迭代或进行复杂计算,就像您在 high-level 编程语言,如 python 或 java(避免进行大量计算,否则会减慢聚合速度)
  • combine_script: 这里我们告诉 elasticsearch return 每个分片的状态
  • reduce_script: 这是我们迭代上一步(又名 combine script)每个分片的结果以计算 的最后一步=64=] 每个聚合的时间戳。

最后,在bucket script中,我们计算给定first_valuelatest_value的差异,我们除以1000,因为时间戳字段存储在epoch millis . time_length单位是秒。

有关脚本化指标聚合的更多信息:Scripted metrics