Elasticsearch过滤器获取给定字段每个值的最后一个文档

Elasticsearch filter to obtain the last document for each value of given field

考虑以下文档集:

{transaction: "A", date: "1999-10-27" }
{transaction: "B", date: "1999-10-26" }
{transaction: "C", date: "1999-10-26" }
{transaction: "A", date: "1999-10-27" }
{transaction: "A", date: "1999-12-25" }
{transaction: "B", date: "2000-10-25" }

我正在尝试添加一个过滤器,该过滤器只会 select 每个 transaction 的最后一个文档,基于字段 date 的值并获取以下文档:

{transaction: "C", date: "1999-10-26" }
{transaction: "A", date: "1999-12-25" }
{transaction: "B", date: "2000-10-25" }

此外,术语聚合不起作用,因为我还需要对生成的文档进行直方图聚合(按年份)

{
    1999: 2
}, {
    2000: 1
}

有两种方法可以满足您的需求。

一个。使用 collapse feature

GET test/_search
{
  "_source": false,
  "query": {
    "match_all": {}
  },
  "collapse": {
    "field": "transaction",
    "inner_hits": [
      {
        "name": "latest",      
        "size": 1,
        "sort": [
          {
            "date": {
              "order": "desc"
            }
          }
        ]
      }
    ]
  }
}

乙。在 transaction 字段上使用 terms 聚合 + 在 date 上排序的 top_hits 用于最新交易

GET test/_search
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "transactions": {
      "terms": { 
        "field": "transaction" 
      },
      "aggs": {
        "latest": {
          "top_hits": {
            "size": 1,
            "sort": [
              {
                 "date": { 
                    "order": "desc"
                 }
              }
            ]                   
          }
        }
      }
    }
  }
}

使用上面的查询,在您的应用程序逻辑中找出日期直方图就很简单了。

更新:

如果您真的希望 ES 为您构建日期直方图,您可以通过利用 scripted_metric aggregation 并自行构建聚合逻辑来实现。请注意,此解决方案使用脚本,它可能会损害集群的性能,具体取决于您的数据量。

POST test/_search
{
  "size": 0,
  "query": {
    "match_all": {}
  },
  "aggs": {
    "years": {
      "scripted_metric": {
        "init_script": "state.latest = [:]",
        "map_script": """
          // 1. record the latest year for each transaction
          def key = doc['transaction.keyword'].value;
          if (!state.latest.containsKey(key)) {
            state.latest[key] = 0;
          }
          def year = doc['date'].value.getYear();
          if (state.latest[key] < year) {
            state.latest[key] = year;
          }
        """,
        "combine_script": """
          return state.latest
        """,
        "reduce_script": """
          // 2. count how many documents per "latest" year
          def years = [:];
          states.stream().forEach(shardState -> {
            shardState.keySet().stream().forEach(transaction -> {
              def year = shardState[transaction].toString();
              if (!years.containsKey(year)) {
                years[year] = 0;
              }
              years[year]++;
            });
          });
          return years;
        """
      }
    }
  }
}

例如,上面的聚合查询可以工作,return下面的,这几乎是你所期望的:

  "aggregations" : {
    "years" : {
      "value" : {
        "2000" : 1,
        "1999" : 2
      }
    }
  }