如何使用单个 Elastic 搜索查询进行重复数据删除和执行聚合?
How to deduplicate and perform aggregations using single Elastic search query?
我有一个存储员工详细信息数据的索引。
我有每个员工整数值 (0-10) 的反馈字段。
我想获得反馈的数量、反馈的平均评分和反馈的每个员工的平均评分。
这里的问题是:
所以我在一个ES索引中有两个或多个相同的文档(重复)(使用员工id和一个反馈标识符,我们可以区分记录),
我想对只有一个文档的某些字段进行平均和计数,仅考虑使用 ES 查询
PS: 我们无法从索引中删除重复项。
数据:
"hits" : [
{
"_index" : "index22",
"_type" : "_doc",
"_id" : "r_QurHEBvLUX24hJph0B",
"_score" : 1.0,
"_source" : {
"empId" : 1,
"feedbackId" : 1,
"feedback" : 2
}
},
{
"_index" : "index22",
"_type" : "_doc",
"_id" : "sPQurHEBvLUX24hJ0R3x",
"_score" : 1.0,
"_source" : {
"empId" : 1,
"feedbackId" : 1,
"feedback" : 2
}
},
{
"_index" : "index22",
"_type" : "_doc",
"_id" : "sfQurHEBvLUX24hJ5h16",
"_score" : 1.0,
"_source" : {
"empId" : 1,
"feedbackId" : 2,
"feedback" : 6
}
}
]
我已经带走了一名 ID 为 1 的员工和三个反馈(两个重复的 identifierId 1 和一个重复的 identifierId2)。总和为 10,使用不同的总和为 8
查询:
我正在使用 scripted metric aggregation 创建唯一的 feedbackIdentifierI 和反馈值的字典。
- "init_script":
Executed prior to any collection of documents. Allows the aggregation
to set up any initial state.
已声明哈希表交易
- "map_script"
Executed once per document collected
Loop through all document and add unique identifierid and corresponding feedback value to
dictionary
- combine_script
Executed once on each shard after document collection is complete
Return 所有分片的字典
- reduce_script
Executed once on the coordinating node after all shards have returned their results
再次遍历从每个分片返回的所有字典并创建一个唯一的字典。循环遍历字典以获得反馈的总和或计数
{
"size": 0,
"aggs": {
"employee": {
"terms": {
"field": "empId",
"size": 10000
},
"aggs": {
"distinct_sum_feedback": {
"scripted_metric": {
"init_script": "state.transactions =new Hashtable();",
"map_script": "if(state.transactions.get(doc.feedbackId)==null){state.transactions.put(doc.feedbackId, doc.feedback.value)}",
"combine_script": "return state.transactions",
"reduce_script": "def sum=0;def feedbacks=new Hashtable();for(a in states){for(entry in a.entrySet()){if(feedbacks.get(entry.getKey())==null){feedbacks.put(entry.getKey(),entry.getValue());}}}for(entry in feedbacks.entrySet()){sum+=entry.getValue();} return sum;"
}
},
"distinct_count_feedback": {
"cardinality": {
"field": "feedbackId"
}
},
"distinct_avg_feedback": {
"bucket_script": {
"buckets_path": {
"sum": "distinct_sum_feedback.value",
"count": "distinct_count_feedback.value"
},
"script": "params.sum/params.count"
}
}
}
},
"sum_feedback": {
"sum_bucket": {
"buckets_path": "employee>distinct_sum_feedback.value"
}
},
"count_feedback": {
"sum_bucket": {
"buckets_path": "employee>distinct_count_feedback.value"
}
}
}
}
结果:
用户 1 的非重复计数:2
用户 1 的不同总和:8(重复项为 10)
"aggregations" : {
"employee" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : 1,
"doc_count" : 3,
"distinct_count_feedback" : {
"value" : 2
},
"distinct_sum_feedback" : {
"value" : 8
},
"distinct_avg_feedback" : {
"value" : 4.0
}
},
{
"key" : 2,
"doc_count" : 1,
"distinct_count_feedback" : {
"value" : 1
},
"distinct_sum_feedback" : {
"value" : 6
},
"distinct_avg_feedback" : {
"value" : 6.0
}
}
]
},
"sum_feedback" : {
"value" : 14.0
},
"count_feedback" : {
"value" : 3.0
}
}
这听起来很像是您应该使用几个桶聚合。我看不出有什么方法可以在一次调用中获取所有这些信息,但我认为肯定有一种方法可以聚合您的数据以获取您需要的信息。
特别是您可以在员工 ID 和反馈标识符上使用 terms aggregation,让每个员工都进入自己的存储桶。从那里,您可以获得每位员工的反馈数量。
您可以仅对反馈标识符执行类似的存储桶查询,以获取所有记录中的反馈计数。
这里是对 Avg_Bucket aggregation 的引用,您可以使用它来计算数据桶(组)的平均值。
请注意,有一个动态集群设置 (search.max_buckets
),最高可达 10,000 个桶。
我认为您可能需要这样的东西,但我没有可用来测试它的索引。也许这可以让您走上正确的轨道,看看如何组合各种聚合:
POST /_search
{
"size": 0,
"aggs": {
"employees": {
"terms": {
"field": "employeeId"
},
"aggs": {
"feedbacks": {
"count": {
"field": "feedbackId"
}
}
}
},
"avg_feedback_score": {
"avg_bucket": {
"buckets_path": "employees>feedbacks"
}
}
}
}
我有一个存储员工详细信息数据的索引。
我有每个员工整数值 (0-10) 的反馈字段。
我想获得反馈的数量、反馈的平均评分和反馈的每个员工的平均评分。
这里的问题是:
所以我在一个ES索引中有两个或多个相同的文档(重复)(使用员工id和一个反馈标识符,我们可以区分记录),
我想对只有一个文档的某些字段进行平均和计数,仅考虑使用 ES 查询
PS: 我们无法从索引中删除重复项。
数据:
"hits" : [
{
"_index" : "index22",
"_type" : "_doc",
"_id" : "r_QurHEBvLUX24hJph0B",
"_score" : 1.0,
"_source" : {
"empId" : 1,
"feedbackId" : 1,
"feedback" : 2
}
},
{
"_index" : "index22",
"_type" : "_doc",
"_id" : "sPQurHEBvLUX24hJ0R3x",
"_score" : 1.0,
"_source" : {
"empId" : 1,
"feedbackId" : 1,
"feedback" : 2
}
},
{
"_index" : "index22",
"_type" : "_doc",
"_id" : "sfQurHEBvLUX24hJ5h16",
"_score" : 1.0,
"_source" : {
"empId" : 1,
"feedbackId" : 2,
"feedback" : 6
}
}
]
我已经带走了一名 ID 为 1 的员工和三个反馈(两个重复的 identifierId 1 和一个重复的 identifierId2)。总和为 10,使用不同的总和为 8
查询:
我正在使用 scripted metric aggregation 创建唯一的 feedbackIdentifierI 和反馈值的字典。
- "init_script":
Executed prior to any collection of documents. Allows the aggregation to set up any initial state.
已声明哈希表交易
- "map_script"
Executed once per document collected Loop through all document and add unique identifierid and corresponding feedback value to dictionary
- combine_script
Executed once on each shard after document collection is complete
Return 所有分片的字典
- reduce_script
Executed once on the coordinating node after all shards have returned their results
再次遍历从每个分片返回的所有字典并创建一个唯一的字典。循环遍历字典以获得反馈的总和或计数
{
"size": 0,
"aggs": {
"employee": {
"terms": {
"field": "empId",
"size": 10000
},
"aggs": {
"distinct_sum_feedback": {
"scripted_metric": {
"init_script": "state.transactions =new Hashtable();",
"map_script": "if(state.transactions.get(doc.feedbackId)==null){state.transactions.put(doc.feedbackId, doc.feedback.value)}",
"combine_script": "return state.transactions",
"reduce_script": "def sum=0;def feedbacks=new Hashtable();for(a in states){for(entry in a.entrySet()){if(feedbacks.get(entry.getKey())==null){feedbacks.put(entry.getKey(),entry.getValue());}}}for(entry in feedbacks.entrySet()){sum+=entry.getValue();} return sum;"
}
},
"distinct_count_feedback": {
"cardinality": {
"field": "feedbackId"
}
},
"distinct_avg_feedback": {
"bucket_script": {
"buckets_path": {
"sum": "distinct_sum_feedback.value",
"count": "distinct_count_feedback.value"
},
"script": "params.sum/params.count"
}
}
}
},
"sum_feedback": {
"sum_bucket": {
"buckets_path": "employee>distinct_sum_feedback.value"
}
},
"count_feedback": {
"sum_bucket": {
"buckets_path": "employee>distinct_count_feedback.value"
}
}
}
}
结果: 用户 1 的非重复计数:2
用户 1 的不同总和:8(重复项为 10)
"aggregations" : {
"employee" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : 1,
"doc_count" : 3,
"distinct_count_feedback" : {
"value" : 2
},
"distinct_sum_feedback" : {
"value" : 8
},
"distinct_avg_feedback" : {
"value" : 4.0
}
},
{
"key" : 2,
"doc_count" : 1,
"distinct_count_feedback" : {
"value" : 1
},
"distinct_sum_feedback" : {
"value" : 6
},
"distinct_avg_feedback" : {
"value" : 6.0
}
}
]
},
"sum_feedback" : {
"value" : 14.0
},
"count_feedback" : {
"value" : 3.0
}
}
这听起来很像是您应该使用几个桶聚合。我看不出有什么方法可以在一次调用中获取所有这些信息,但我认为肯定有一种方法可以聚合您的数据以获取您需要的信息。
特别是您可以在员工 ID 和反馈标识符上使用 terms aggregation,让每个员工都进入自己的存储桶。从那里,您可以获得每位员工的反馈数量。
您可以仅对反馈标识符执行类似的存储桶查询,以获取所有记录中的反馈计数。
这里是对 Avg_Bucket aggregation 的引用,您可以使用它来计算数据桶(组)的平均值。
请注意,有一个动态集群设置 (search.max_buckets
),最高可达 10,000 个桶。
我认为您可能需要这样的东西,但我没有可用来测试它的索引。也许这可以让您走上正确的轨道,看看如何组合各种聚合:
POST /_search
{
"size": 0,
"aggs": {
"employees": {
"terms": {
"field": "employeeId"
},
"aggs": {
"feedbacks": {
"count": {
"field": "feedbackId"
}
}
}
},
"avg_feedback_score": {
"avg_bucket": {
"buckets_path": "employees>feedbacks"
}
}
}
}