Elasticsearch 平均日期直方图桶

Elasticsearch average over date histogram buckets

我在 ElasticSearch 中索引了一堆文档,我需要获取以下数据:

对于每个月,获取每月每个工作日的平均文档数(或者如果不可能,则使用 20 天作为默认值)。

我已经使用 date histogram 聚合将我的数据聚合到月桶中。我试图嵌套一个 stats 存储桶,但此聚合使用从文档字段中提取的数据,而不是从父存储桶中提取的数据。

到目前为止,这是我的查询:

{
    "query": {
        "match_all": {}
    },
    "aggs": {
        "docs_per_month": {
            "date_histogram": {
                "field": "created_date",
                "interval": "month",
                "min_doc_count": 0
            }
            "aggs": {
                '???': '???'
            }
        }
    }
}

编辑

为了让我的问题更清楚,我需要的是:


您想在周六和周日排除带有时间戳的文档,因此您可以使用脚本在查询中排除这些文档

{
  "query": {
    "filtered": {
      "filter": {
        "script": {
          "script": "doc['@timestamp'].date.dayOfWeek != 7 && doc['@timestamp'].date.dayOfWeek != 6"
        }
      }
    }
  },
  "aggs": {
    "docs_per_month": {
      "date_histogram": {
        "field": "created_date",
        "interval": "month",
        "min_doc_count": 0
      },
      "aggs": {
        "docs_per_day": {
          "date_histogram": {
            "field": "created_date",
            "interval": "day",
            "min_doc_count": 0
          }
        },
        "aggs": {
          "docs_count": {
            "avg": {
              "field": ""
            }
          }
        }
      }
    }
  }
}

您可能不需要按月进行第一次汇总,因为您已经使用天间隔获得此信息

顺便说一句,您需要通过将此添加到您的 elasticsearch.yml 配置来确保启用动态脚本

script.disable_dynamic: false

或在 /config/scripts 下添加一个 groovy 脚本,并在 filter

中使用带有脚本的过滤查询

你基本上需要的是这样的东西(它不起作用,因为它不是可用的功能):

{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "docs_per_month": {
      "date_histogram": {
        "field": "date",
        "interval": "month",
        "min_doc_count": 0
      },
      "aggs": {
        "average": {
          "avg": {
            "script": "doc_count / 20"
          }
        }
      }
    }
  }
}

它不起作用,因为无法从 "parent" 聚合访问 doc_count

但是,这将在 Elasticsearch 的 2.x 分支中成为可能,目前正在积极开发中:https://github.com/elastic/elasticsearch/issues/8110 这个新功能将在聚合的结果(桶)上添加第二层操作,它不仅是您的用例,还有许多其他用例。

除非您想尝试 some ideas out there 或在您的应用中执行自己的计算,否则您需要等待此功能。

使用以下 scripted_metric aggregation.

有一个非常复杂的解决方案并且不是很高效
{
  "size": 0,
  "query": {
    "match_all": {}
  },
  "aggs": {
    "docs_per_month": {
      "date_histogram": {
        "field": "created_date",
        "interval": "month",
        "min_doc_count": 0
      },
      "aggs": {
        "avg_doc_per_biz_day": {
          "scripted_metric": {
            "init_script": "_agg.bizdays = []; _agg.allbizdays = [:]; start = new DateTime(1970, 1, 1, 0, 0); now = new DateTime(); while (start < now) { def end = start.plusMonths(1); _agg.allbizdays[start.year + '_' + start.monthOfYear] = (start.toDate()..<end.toDate()).sum {(it.day != 6 && it.day != 0) ? 1 : 0 }; start = end; }",
            "map_script": "_agg.bizdays << _agg.allbizdays[doc. created_date.date.year+'_'+doc. created_date.date.monthOfYear]",
            "combine_script": "_agg.allbizdays = null; doc_count = 0; for (d in _agg.bizdays){ doc_count++ }; return doc_count / _agg.bizdays[0]",
            "reduce_script": "res = 0; for (a in _aggs) { res += a }; return res"
          }
        }
      }
    }
  }
}

让我们在下面详细介绍每个脚本。

我在 init_script 中所做的是创建自 1970 年以来每个月的工作日数地图并将其存储在 _agg.allbizdays 地图中。

_agg.bizdays = [];
_agg.allbizdays = [:]; 
start = new DateTime(1970, 1, 1, 0, 0);
now = new DateTime();
while (start < now) { 
    def end = start.plusMonths(1);     
    _agg.allbizdays[start.year + '_' + start.monthOfYear] = (start.toDate()..<end.toDate()).sum {(it.day != 6 && it.day != 0) ? 1 : 0 }; 
    start = end; 
}

map_script 中,我只是检索每个文档所在月份的工作日数;

_agg.bizdays << _agg.allbizdays[doc.created_date.date.year + '_' + doc. created_date.date.monthOfYear];

combine_script 中,我总结了每个分片的平均文档数

_agg.allbizdays = null;
doc_count = 0; 
for (d in _agg.bizdays){ doc_count++ }; 
return doc_count / _agg.bizdays[0];

最后在 reduce_script 中,我总结了每个节点的平均文档数:

res = 0; 
for (a in _aggs) { res += a }; 
return res

我再次认为它非常复杂,正如 Andrei 所说的那样,最好等待 2.0 使其按应有的方式工作,但与此同时,如果需要,您可以使用此解决方案。

对于仍然感兴趣的任何人,您现在可以使用 avg_bucket 聚合。它仍然有点棘手,因为你不能简单地 运行 avg_bucketdate_historgram 聚合结果上,但是使用具有一些唯一值的辅助 value_count 聚合并且它工作正常: )

{
  "size": 0,
  "aggs": {
    "orders_per_day": {
      "date_histogram": {
        "field": "orderedDate",
        "interval": "day"
      },
      "aggs": {
        "amount": {
          "value_count": {
            "field": "dateCreated"
          }
        }
      }
    },
    "avg_daily_order": {
      "avg_bucket": {
        "buckets_path": "orders_per_day>amount"
      }
    }
  }
}