Elasticsearch中如何聚合聚合的结果?
How to aggregate the result of aggregation in Elasticsearch?
我想使用 Elasticsearch 聚合其他聚合的结果。我已经创建了我需要的第一个聚合:
es.search(index='Whosebug', body = {
"size":0,
"query": {
"bool": {
"filter": {
"match" : {"type": "Posts"}
},
"filter": {
"match" : {"PostTypeId": "1"}
}
}
},
"aggs" : {
"by_user": {
"terms": {
"field": "OwnerUserId"
}
}
}
})
此查询获取类型为 post 的所有文档(PostTypeId = 1)。然后,它通过 OwnerUserId 进行聚合,统计每个用户的问题数 post,得到以下结果:
{'took': 0,
'timed_out': False,
'_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
'hits': {'total': {'value': 10000, 'relation': 'gte'},
'max_score': None,
'hits': []},
'aggregations': {'by_user': {'doc_count_error_upper_bound': 0,
'sum_other_doc_count': 31053,
'buckets': [{'key': '2230', 'doc_count': 223},
{'key': '', 'doc_count': 177},
{'key': '38304', 'doc_count': 158},
{'key': '5997', 'doc_count': 144},
{'key': '4048', 'doc_count': 130},
{'key': '25813', 'doc_count': 119},
{'key': '27826', 'doc_count': 119},
{'key': '2633', 'doc_count': 115},
{'key': '19919', 'doc_count': 114},
{'key': '13938', 'doc_count': 111}]}}}
现在我想对上一个的结果做另一个聚合:通过doc_count聚合,我的意思是分组并计算相同数量的问题post秒。对于之前的结果,我想要的结果是:
{'buckets': [{'key': '223', 'doc_count': 1},
{'key': '177', 'doc_count': 1},
{'key': '158', 'doc_count': 1},
{'key': '144', 'doc_count': 1},
{'key': '130', 'doc_count': 1},
{'key': '119', 'doc_count': 2},
{'key': '115', 'doc_count': 1},
{'key': '114', 'doc_count': 1},
{'key': '111', 'doc_count': 1}]}
我可以找到一种方法来聚合聚合结果(至少是直接聚合)。正如我在 Elasticsearch 的论坛上看到的那样,这个用例没有被考虑,因为它效率太低了。
为了解决我的用例,我所做的是利用转换 API 将第一个聚合存储在时间索引中,然后对该索引执行第二个聚合。
首先,我创建了一个转换来执行第一个聚合(按 OwnerUserId 分组并计算每个用户 post 提出的问题数量):
url = 'http://localhost:9200/_transform/transform_rq1'
headers = {
'Content-Type': 'application/json'
}
query = {
"source": {
"index": "posts",
"query": {
"bool": {
"filter": {
"match" : {"PostTypeId": "1"}
}
}
}
},
"dest": {
"index": "rq1"
},
"pivot": {
"group_by": {
"OwnerUserId": {
"terms": {
"field": "OwnerUserId"
}
}
},
"aggregations": {
"count": {
"value_count": {
"field": "OwnerUserId"
}
}
}
}
}
response = requests.put(url, headers=headers, data=json.dumps(query))
然后,我开始转换以执行它:
url = 'http://localhost:9200/_transform/transform_rq1/_start'
headers = {
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers).json()
最后,我对创建的时间索引进行第二次聚合(按每个用户的问题数分组,得到多少用户post多少问题):
response = es.search(index='rq1', body = {
"size":0,
"query": {
"match_all": {}
},
"aggs" : {
"by_num": {
"terms": {
"field": "count",
"order" : { "_key" : "asc" },
"size": 30000
}
}
}
})
print(response)
如您所见,我已经在Python中编写了这段代码。
我想使用 Elasticsearch 聚合其他聚合的结果。我已经创建了我需要的第一个聚合:
es.search(index='Whosebug', body = {
"size":0,
"query": {
"bool": {
"filter": {
"match" : {"type": "Posts"}
},
"filter": {
"match" : {"PostTypeId": "1"}
}
}
},
"aggs" : {
"by_user": {
"terms": {
"field": "OwnerUserId"
}
}
}
})
此查询获取类型为 post 的所有文档(PostTypeId = 1)。然后,它通过 OwnerUserId 进行聚合,统计每个用户的问题数 post,得到以下结果:
{'took': 0,
'timed_out': False,
'_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
'hits': {'total': {'value': 10000, 'relation': 'gte'},
'max_score': None,
'hits': []},
'aggregations': {'by_user': {'doc_count_error_upper_bound': 0,
'sum_other_doc_count': 31053,
'buckets': [{'key': '2230', 'doc_count': 223},
{'key': '', 'doc_count': 177},
{'key': '38304', 'doc_count': 158},
{'key': '5997', 'doc_count': 144},
{'key': '4048', 'doc_count': 130},
{'key': '25813', 'doc_count': 119},
{'key': '27826', 'doc_count': 119},
{'key': '2633', 'doc_count': 115},
{'key': '19919', 'doc_count': 114},
{'key': '13938', 'doc_count': 111}]}}}
现在我想对上一个的结果做另一个聚合:通过doc_count聚合,我的意思是分组并计算相同数量的问题post秒。对于之前的结果,我想要的结果是:
{'buckets': [{'key': '223', 'doc_count': 1},
{'key': '177', 'doc_count': 1},
{'key': '158', 'doc_count': 1},
{'key': '144', 'doc_count': 1},
{'key': '130', 'doc_count': 1},
{'key': '119', 'doc_count': 2},
{'key': '115', 'doc_count': 1},
{'key': '114', 'doc_count': 1},
{'key': '111', 'doc_count': 1}]}
我可以找到一种方法来聚合聚合结果(至少是直接聚合)。正如我在 Elasticsearch 的论坛上看到的那样,这个用例没有被考虑,因为它效率太低了。
为了解决我的用例,我所做的是利用转换 API 将第一个聚合存储在时间索引中,然后对该索引执行第二个聚合。
首先,我创建了一个转换来执行第一个聚合(按 OwnerUserId 分组并计算每个用户 post 提出的问题数量):
url = 'http://localhost:9200/_transform/transform_rq1'
headers = {
'Content-Type': 'application/json'
}
query = {
"source": {
"index": "posts",
"query": {
"bool": {
"filter": {
"match" : {"PostTypeId": "1"}
}
}
}
},
"dest": {
"index": "rq1"
},
"pivot": {
"group_by": {
"OwnerUserId": {
"terms": {
"field": "OwnerUserId"
}
}
},
"aggregations": {
"count": {
"value_count": {
"field": "OwnerUserId"
}
}
}
}
}
response = requests.put(url, headers=headers, data=json.dumps(query))
然后,我开始转换以执行它:
url = 'http://localhost:9200/_transform/transform_rq1/_start'
headers = {
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers).json()
最后,我对创建的时间索引进行第二次聚合(按每个用户的问题数分组,得到多少用户post多少问题):
response = es.search(index='rq1', body = {
"size":0,
"query": {
"match_all": {}
},
"aggs" : {
"by_num": {
"terms": {
"field": "count",
"order" : { "_key" : "asc" },
"size": 30000
}
}
}
})
print(response)
如您所见,我已经在Python中编写了这段代码。