使用 Elasticsearch 对连续文档进行分组

Grouping consecutive documents with Elasticsearch

有没有办法让 Elasticsearch 在分组时考虑序列间隙?

前提是将以下数据批量导入到Elasticsearch:

{ "index": { "_index": "test", "_type": "groupingTest", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "5" } }
{ "sequence": 5, "type": "A" }

有没有办法以

的方式查询此数据

...考虑到 A 类序列被 B 类项目(或任何其他非 A 类项目)打断的事实?

我希望结果桶看起来像这样(sequence_group 的名称和值可能不同 - 只是试图说明逻辑):

"buckets": [
    {
       "key": "a",
       "sequence_group": 1,
       "doc_count": 2
    },
    {
       "key": "b",
       "sequence_group": 3,
       "doc_count": 1
    },
    {
       "key": "a",
       "sequence_group": 4,
       "doc_count": 2
    }
]

https://www.simple-talk.com/sql/t-sql-programming/the-sql-of-gaps-and-islands-in-sequences/ 上对问题进行了很好的描述,并提供了一些 SQL 解决方法。我想知道是否也有适用于 elasticsearch 的解决方案。

您始终可以进行术语聚合,然后应用热门聚合来获得此结果。

{
  "aggs": {
    "types": {
      "terms": {
        "field": "type"
      },
      "aggs": {
        "groups": {
          "top_hits": {
            "size": 10
          }
        }
      }
    }
  }
}

我们可以在这里使用 Scripted Metric Aggregation,它以 map-reduce 方式工作(参考 link)。它有不同的部分,如 init、map、combine 和 reduce。而且,好消息是所有这些的结果也可以是列表或地图。

我试了一下这个。

使用的 ElasticSearch 版本:7.1

正在创建索引:

PUT test
{
  "mappings": {
    "properties": {
      "sequence": {
        "type": "long"
      },
      "type": {
        "type": "text",
        "fielddata": true
      }
    }
  }
}

批量索引:(注意我删除了映射类型'groupingTest')

POST _bulk
{ "index": { "_index": "test", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_id": "5" } }
{ "sequence": 5, "type": "A" }

查询

GET test/_doc/_search
{
  "size": 0,
  "aggs": {
    "scripted_agg": {
      "scripted_metric": {
        "init_script": """ 
          state.seqTypeArr = [];
        """,
        "map_script": """ 
          def seqType = doc.sequence.value + '_' + doc['type'].value;
          state.seqTypeArr.add(seqType);
        """,
        "combine_script": """
          def list = [];
          for(seqType in state.seqTypeArr) {
            list.add(seqType);
          }
          return list;
        """,
        "reduce_script": """ 
          def fullList = [];
          for(agg_value in states) {
            for(x in agg_value) {
              fullList.add(x);
            }
          }
          fullList.sort((a,b) -> a.compareTo(b));
          def result = [];
          def item = new HashMap();
          for(int i=0; i<fullList.size(); i++) {
            def str = fullList.get(i);
            def index = str.indexOf("_");
            def ch = str.substring(index+1);
            def val = str.substring(0, index);
            if(item["key"] == null) {
              item["key"] = ch;
              item["sequence_group"] = val;
              item["doc_count"] = 1;
            } else if(item["key"] == ch) {
              item["doc_count"] = item["doc_count"] + 1;
            } else {
              result.add(item);
              item = new HashMap();
              item["key"] = ch;
              item["sequence_group"] = val;
              item["doc_count"] = 1;
            }
          }
          result.add(item);
          return result;
        """
      }
    }
  }
}

最后 输出:

{
  "took" : 21,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 5,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "scripted_agg" : {
      "value" : [
        {
          "doc_count" : 2,
          "sequence_group" : "1",
          "key" : "a"
        },
        {
          "doc_count" : 1,
          "sequence_group" : "3",
          "key" : "b"
        },
        {
          "doc_count" : 2,
          "sequence_group" : "4",
          "key" : "a"
        }
      ]
    }
  }
}

请注意,脚本聚合对查询性能影响很大。因此,如果没有大量文档,您可能会注意到速度有些慢。