ElasticSearch:日期时间格式的平均聚合

ElasticSearch: Avg aggregation for datetime format

我对使用 python

的弹性搜索查询感到困惑

我有这样的数据:

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "gdUJpXIBAoADuwvHTK29",
  "_score": 1,
  "_source": {
    "user_name": "prathameshsalap@gmail.com",
    "working_hours": "2019-10-21 09:00:01",
}

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "gtUJpXIBAoADuwvHTK29",
  "_version": 1,
  "_score": 0,
  "_source": {
    "user_name": "vaishusawant143@gmail.com",
    "working_hours": "2019-10-21 09:15:01",
}

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "g9UJpXIBAoADuwvHTK29",
  "_version": 1,
  "_score": 0,
  "_source": {
    "user_name": "prathameshsalap@gmail.com",
    "working_hours": "2019-10-22 07:50:00",
}

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "g8UJpXIBAoADuwvHTK29",
  "_version": 1,
  "_score": 0,
  "_source": {
    "user_name": "vaishusawant143@gmail.com",
    "working_hours": "2019-10-22 04:15:01",
}

在这里,为每个用户提供不同日期(21 和 22)的工作时间。我想取每个用户的工作时间的平均值。

{
    "size": 0,
    "query" : {"match_all": {}},
     "aggs": {
      "users": {
          "terms": {
              "field": "user_name"
          },
          "aggs": {
              "avg_hours": {
                  "avg": {
                      "field": "working_hours"
                  }
              }
          }
      }
  }
}

此查询无效。如何找到每个用户在所有日期的平均工作时间?而且,我还想 运行 使用 python-elastic search 运行 这个查询。

已更新 当我使用摄取管道作为@Val 提及时。我收到一个错误:

{
  "error" : {
    "root_cause" : [
      {
        "type" : "script_exception",
        "reason" : "compile error",
        "processor_type" : "script",
        "script_stack" : [
          "\n        def workDate = /\s+/.split(ctx.working_h ...",
          "                        ^---- HERE"
        ],
        "script" : "\n        def workDate = /\s+/.split(ctx.working_hours);\n        def workHours = /:/.split(workDate[1]);\n        ctx.working_minutes = (Integer.parseInt(workHours[0]) * 60) + Integer.parseInt(workHours[1]);\n        ",
        "lang" : "painless",
        "position" : {
          "offset" : 24,
          "start" : 0,
          "end" : 49
        }
      }
.....

我该如何解决?

问题是您的 working_hours 字段是一个时间点,并不表示持续时间。

对于此用例,最好将工作日和工作时间存储在两个单独的字段中,并以分钟为单位存储工作时间。

所以不要有这样的文档:

{
    "user_name": "prathameshsalap@gmail.com",
    "working_hours": "2019-10-21 09:00:01",
}

像这样创建文档:

{
    "user_name": "prathameshsalap@gmail.com",
    "working_day": "2019-10-21",
    "working_hours": "09:00:01",
    "working_minutes": 540
}

然后您可以在 working_minutes 字段上使用您的查询:

{
    "size": 0,
    "query" : {"match_all": {}},
     "aggs": {
      "users": {
          "terms": {
              "field": "user_name.keyword",
              "order": {
                 "avg_hours": "desc"
              }
          },
          "aggs": {
              "avg_hours": {
                  "avg": {
                      "field": "working_minutes"
                  }
              }
          }
      }
  }
}

如果在您的客户端代码中计算 working_minutes 字段不方便,您可以使用 ingest pipeline 来实现相同的目的。让我们先定义管道:

PUT _ingest/pipeline/working-hours
{
  "processors": [
    {
      "dissect": {
        "field": "working_hours",
        "pattern": "%{?date} %{tmp_hours}:%{tmp_minutes}:%{?seconds}"
      }
    },
    {
      "convert": {
        "field": "tmp_hours",
        "type": "integer"
      }
    },
    {
      "convert": {
        "field": "tmp_minutes",
        "type": "integer"
      }
    },
    {
      "script": {
        "source": """
        ctx.working_minutes = (ctx.tmp_hours * 60) + ctx.tmp_minutes;
        """
      }
    },
    {
      "remove": {
        "field": [
          "tmp_hours",
          "tmp_minutes"
        ]
      }
    }
  ]
}

然后您需要更新 Python 客户端代码以使用将为您创建 working_hours 字段的新管道:

helpers.bulk(es, reader, index='user_log', doc_type='logs', pipeline='working-hours')