使用 Elasticsearch 对连续文档进行分组
Grouping consecutive documents with Elasticsearch
有没有办法让 Elasticsearch 在分组时考虑序列间隙?
前提是将以下数据批量导入到Elasticsearch:
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "5" } }
{ "sequence": 5, "type": "A" }
有没有办法以
的方式查询此数据
- 序列号为 1 和 2 的文档进入一个输出组,
- 序号为3的文档转到另一个,
- 序号为4和5的文档进入第三组?
...考虑到 A 类序列被 B 类项目(或任何其他非 A 类项目)打断的事实?
我希望结果桶看起来像这样(sequence_group 的名称和值可能不同 - 只是试图说明逻辑):
"buckets": [
{
"key": "a",
"sequence_group": 1,
"doc_count": 2
},
{
"key": "b",
"sequence_group": 3,
"doc_count": 1
},
{
"key": "a",
"sequence_group": 4,
"doc_count": 2
}
]
在 https://www.simple-talk.com/sql/t-sql-programming/the-sql-of-gaps-and-islands-in-sequences/ 上对问题进行了很好的描述,并提供了一些 SQL 解决方法。我想知道是否也有适用于 elasticsearch 的解决方案。
您始终可以进行术语聚合,然后应用热门聚合来获得此结果。
{
"aggs": {
"types": {
"terms": {
"field": "type"
},
"aggs": {
"groups": {
"top_hits": {
"size": 10
}
}
}
}
}
}
我们可以在这里使用 Scripted Metric Aggregation
,它以 map-reduce 方式工作(参考 link)。它有不同的部分,如 init、map、combine 和 reduce。而且,好消息是所有这些的结果也可以是列表或地图。
我试了一下这个。
使用的 ElasticSearch 版本:7.1
正在创建索引:
PUT test
{
"mappings": {
"properties": {
"sequence": {
"type": "long"
},
"type": {
"type": "text",
"fielddata": true
}
}
}
}
批量索引:(注意我删除了映射类型'groupingTest')
POST _bulk
{ "index": { "_index": "test", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_id": "5" } }
{ "sequence": 5, "type": "A" }
查询
GET test/_doc/_search
{
"size": 0,
"aggs": {
"scripted_agg": {
"scripted_metric": {
"init_script": """
state.seqTypeArr = [];
""",
"map_script": """
def seqType = doc.sequence.value + '_' + doc['type'].value;
state.seqTypeArr.add(seqType);
""",
"combine_script": """
def list = [];
for(seqType in state.seqTypeArr) {
list.add(seqType);
}
return list;
""",
"reduce_script": """
def fullList = [];
for(agg_value in states) {
for(x in agg_value) {
fullList.add(x);
}
}
fullList.sort((a,b) -> a.compareTo(b));
def result = [];
def item = new HashMap();
for(int i=0; i<fullList.size(); i++) {
def str = fullList.get(i);
def index = str.indexOf("_");
def ch = str.substring(index+1);
def val = str.substring(0, index);
if(item["key"] == null) {
item["key"] = ch;
item["sequence_group"] = val;
item["doc_count"] = 1;
} else if(item["key"] == ch) {
item["doc_count"] = item["doc_count"] + 1;
} else {
result.add(item);
item = new HashMap();
item["key"] = ch;
item["sequence_group"] = val;
item["doc_count"] = 1;
}
}
result.add(item);
return result;
"""
}
}
}
}
最后 输出:
{
"took" : 21,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"scripted_agg" : {
"value" : [
{
"doc_count" : 2,
"sequence_group" : "1",
"key" : "a"
},
{
"doc_count" : 1,
"sequence_group" : "3",
"key" : "b"
},
{
"doc_count" : 2,
"sequence_group" : "4",
"key" : "a"
}
]
}
}
}
请注意,脚本聚合对查询性能影响很大。因此,如果没有大量文档,您可能会注意到速度有些慢。
有没有办法让 Elasticsearch 在分组时考虑序列间隙?
前提是将以下数据批量导入到Elasticsearch:
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "5" } }
{ "sequence": 5, "type": "A" }
有没有办法以
的方式查询此数据- 序列号为 1 和 2 的文档进入一个输出组,
- 序号为3的文档转到另一个,
- 序号为4和5的文档进入第三组?
...考虑到 A 类序列被 B 类项目(或任何其他非 A 类项目)打断的事实?
我希望结果桶看起来像这样(sequence_group 的名称和值可能不同 - 只是试图说明逻辑):
"buckets": [
{
"key": "a",
"sequence_group": 1,
"doc_count": 2
},
{
"key": "b",
"sequence_group": 3,
"doc_count": 1
},
{
"key": "a",
"sequence_group": 4,
"doc_count": 2
}
]
在 https://www.simple-talk.com/sql/t-sql-programming/the-sql-of-gaps-and-islands-in-sequences/ 上对问题进行了很好的描述,并提供了一些 SQL 解决方法。我想知道是否也有适用于 elasticsearch 的解决方案。
您始终可以进行术语聚合,然后应用热门聚合来获得此结果。
{
"aggs": {
"types": {
"terms": {
"field": "type"
},
"aggs": {
"groups": {
"top_hits": {
"size": 10
}
}
}
}
}
}
我们可以在这里使用 Scripted Metric Aggregation
,它以 map-reduce 方式工作(参考 link)。它有不同的部分,如 init、map、combine 和 reduce。而且,好消息是所有这些的结果也可以是列表或地图。
我试了一下这个。
使用的 ElasticSearch 版本:7.1
正在创建索引:
PUT test
{
"mappings": {
"properties": {
"sequence": {
"type": "long"
},
"type": {
"type": "text",
"fielddata": true
}
}
}
}
批量索引:(注意我删除了映射类型'groupingTest')
POST _bulk
{ "index": { "_index": "test", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_id": "5" } }
{ "sequence": 5, "type": "A" }
查询
GET test/_doc/_search
{
"size": 0,
"aggs": {
"scripted_agg": {
"scripted_metric": {
"init_script": """
state.seqTypeArr = [];
""",
"map_script": """
def seqType = doc.sequence.value + '_' + doc['type'].value;
state.seqTypeArr.add(seqType);
""",
"combine_script": """
def list = [];
for(seqType in state.seqTypeArr) {
list.add(seqType);
}
return list;
""",
"reduce_script": """
def fullList = [];
for(agg_value in states) {
for(x in agg_value) {
fullList.add(x);
}
}
fullList.sort((a,b) -> a.compareTo(b));
def result = [];
def item = new HashMap();
for(int i=0; i<fullList.size(); i++) {
def str = fullList.get(i);
def index = str.indexOf("_");
def ch = str.substring(index+1);
def val = str.substring(0, index);
if(item["key"] == null) {
item["key"] = ch;
item["sequence_group"] = val;
item["doc_count"] = 1;
} else if(item["key"] == ch) {
item["doc_count"] = item["doc_count"] + 1;
} else {
result.add(item);
item = new HashMap();
item["key"] = ch;
item["sequence_group"] = val;
item["doc_count"] = 1;
}
}
result.add(item);
return result;
"""
}
}
}
}
最后 输出:
{
"took" : 21,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"scripted_agg" : {
"value" : [
{
"doc_count" : 2,
"sequence_group" : "1",
"key" : "a"
},
{
"doc_count" : 1,
"sequence_group" : "3",
"key" : "b"
},
{
"doc_count" : 2,
"sequence_group" : "4",
"key" : "a"
}
]
}
}
}
请注意,脚本聚合对查询性能影响很大。因此,如果没有大量文档,您可能会注意到速度有些慢。