Elasticsearch:时间范围聚合未按预期工作

Elasticsearch: Time Range aggregation is not working as expected

我是 elasticsearch 领域的新手。我正在学习和尝试它是否满足我的需求。

现在我正在学习 elasticsearch 中的聚合,我编写了以下 python 脚本来将一些时间序列数据提取到 elasticsearch 中。

我每 5 秒创建一条新消息,其中包含:

  1. 时间戳(ISO8601 格式)
  2. 计数器
  3. 一个运行0到100之间的dom数

对于每个新的一天,我都会创建一个新索引,并将 logs_Y-m-D 作为索引名称。

我将使用消息 Counter 作为 _id 对每条消息进行索引。每个新索引(每天)都会重置计数器。


import csv
import time
import random
from datetime import datetime
from elasticsearch import Elasticsearch


class ElasticSearchDB:
    def __init__(self):
        self.es = Elasticsearch()

    def run(self):
        print("Started: {}".format(datetime.now().isoformat()))
        print("<Ctrl + c> for exit!")

        with open("..\out\logs.csv", "w", newline='') as f:
            writer = csv.writer(f)
            counter = 0
            try:
                while True:
                    i_name = "logs_" + time.strftime("%Y-%m-%d")
                    if not self.es.indices.exists([i_name]):
                        self.es.indices.create(i_name, ignore=400)
                        print("New index created: {}".format(i_name))
                        counter = 0

                    message = {"counter": counter, "@timestamp": datetime.now().isoformat(), "value": random.randint(0, 100)}
                    # Write to file
                    writer.writerow(message.values())
                    # Write to elasticsearch index
                    self.es.index(index=i_name, doc_type="logs", id=counter, body=message)
                    # Waste some time
                    time.sleep(5)
                    counter += 1

            except KeyboardInterrupt:
                print("Stopped: {}".format(datetime.now().isoformat()))


test_es = ElasticSearchDB()
test_es.run()

我 运行 这个脚本30 分钟。接下来,使用 Sense,我使用以下 聚合 查询来查询 elasticsearch。

查询 #1:获取全部

查询 #2:汇总最近 1 小时的日志并为它们生成统计信息。这显示了正确的结果。

查询 #3:汇总最近 1 分钟的日志并为它们生成统计信息。聚合的文档数量与 1 小时聚合的相同,理想情况下,它应该只聚合 12-13 个日志.

查询 #4:汇总最近 15 秒的日志并为它们生成统计信息。聚合的文档数量与 1 小时聚合的相同,理想情况下,它应该只聚合 3-4 个日志.

我的问题:

  1. 为什么elasticsearch无法理解1分15秒 运行ge?
  2. 我了解映射,但我不知道如何写一个,所以我没有写一个,这是导致此问题的原因吗?

请帮忙!


查询 #1:获取全部

GET /_search

输出:

{
   "took": 3,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 314,
      "max_score": 1,
      "hits": [
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "19",
            "_score": 1,
            "_source": {
               "counter": 19,
               "value": 62,
               "@timestamp": "2016-11-03T07:40:35.981395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "22",
            "_score": 1,
            "_source": {
               "counter": 22,
               "value": 95,
               "@timestamp": "2016-11-03T07:40:51.066395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "25",
            "_score": 1,
            "_source": {
               "counter": 25,
               "value": 18,
               "@timestamp": "2016-11-03T07:41:06.140395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "26",
            "_score": 1,
            "_source": {
               "counter": 26,
               "value": 58,
               "@timestamp": "2016-11-03T07:41:11.164395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "29",
            "_score": 1,
            "_source": {
               "counter": 29,
               "value": 73,
               "@timestamp": "2016-11-03T07:41:26.214395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "41",
            "_score": 1,
            "_source": {
               "counter": 41,
               "value": 59,
               "@timestamp": "2016-11-03T07:42:26.517395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "14",
            "_score": 1,
            "_source": {
               "counter": 14,
               "value": 9,
               "@timestamp": "2016-11-03T07:40:10.857395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "40",
            "_score": 1,
            "_source": {
               "counter": 40,
               "value": 9,
               "@timestamp": "2016-11-03T07:42:21.498395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "24",
            "_score": 1,
            "_source": {
               "counter": 24,
               "value": 41,
               "@timestamp": "2016-11-03T07:41:01.115395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "0",
            "_score": 1,
            "_source": {
               "counter": 0,
               "value": 79,
               "@timestamp": "2016-11-03T07:39:00.302395"
            }
         }
      ]
   }
}

查询 #2:获取过去 1 小时的统计数据。

GET /logs_2016-11-03/logs/_search?search_type=count
{
    "aggs": {
        "time_range": {
            "filter": {
                "range": {
                    "@timestamp": {
                        "from": "now-1h"
                    }
                }
            },
            "aggs": {
                "just_stats": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
    }
}

输出:

{
   "took": 5,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 366,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "time_range": {
         "doc_count": 366,
         "just_stats": {
            "count": 366,
            "min": 0,
            "max": 100,
            "avg": 53.17213114754098,
            "sum": 19461
         }
      }
   }
}

我得到 366 个条目,这是正确的。

查询 #3:获取最近 1 分钟的统计数据。

GET /logs_2016-11-03/logs/_search?search_type=count
{
    "aggs": {
        "time_range": {
            "filter": {
                "range": {
                    "@timestamp": {
                        "from": "now-1m"
                    }
                }
            },
            "aggs": {
                "just_stats": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
    }
}

输出:

{
   "took": 15,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 407,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "time_range": {
         "doc_count": 407,
         "just_stats": {
            "count": 407,
            "min": 0,
            "max": 100,
            "avg": 53.152334152334156,
            "sum": 21633
         }
      }
   }
}

这是错误的,最后 1 分钟内不可能是 407 个条目,应该只是 12-13 个日志。

查询 #4:获取过去 15 秒的统计数据。

GET /logs_2016-11-03/logs/_search?search_type=count
{
    "aggs": {
        "time_range": {
            "filter": {
                "range": {
                    "@timestamp": {
                        "from": "now-15s"
                    }
                }
            },
            "aggs": {
                "just_stats": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
    }
}

输出:

{
   "took": 15,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 407,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "time_range": {
         "doc_count": 407,
         "just_stats": {
            "count": 407,
            "min": 0,
            "max": 100,
            "avg": 53.152334152334156,
            "sum": 21633
         }
      }
   }
}

这也是错误的,不可能是最后15秒内的407个条目。应该只有3-4个log而已。

您的查询是正确的,但 ES 以 UTC 格式存储日期,因此您将取回所有内容。来自 documentation

In JSON documents, dates are represented as strings. Elasticsearch uses a set of preconfigured formats to recognize and parse these strings into a long value representing milliseconds-since-the-epoch in UTC.

您可以使用 pytz 模块并在 ES 中以 UTC 格式存储日期。参考this SO问题。

您也可以在范围查询中使用 time_zone 参数,而且最好对筛选结果进行聚合,而不是获取所有结果然后对所有结果进行筛选。

GET /logs_2016-11-03/logs/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "2016-11-03T07:15:35",         <----- You would need absolute value
            "time_zone": "-01:00"              <---- timezone setting
          }
        }
      }
    }
  },
  "aggs": {
    "just_stats": {
      "stats": {
        "field": "value"
      }
    }
  },
  "size": 0
}

您必须将所需时间 (now-1m, now-15s) 转换为格式 yyyy-MM-dd'T'HH:mm:ss for time_zone 参数才能作为 now 不受 time_zone 的影响,所以最好的选择是将日期转换为 UTC 并存储它。