如何使用单个 Elastic 搜索查询进行重复数据删除和执行聚合?

How to deduplicate and perform aggregations using single Elastic search query?

我有一个存储员工详细信息数据的索引。

我有每个员工整数值 (0-10) 的反馈字段。

我想获得反馈的数量、反馈的平均评分和反馈的每个员工的平均评分。

这里的问题是:

所以我在一个ES索引中有两个或多个相同的文档(重复)(使用员工id和一个反馈标识符,我们可以区分记录),

我想对只有一个文档的某些字段进行平均和计数,仅考虑使用 ES 查询

PS: 我们无法从索引中删除重复项。

数据:

"hits" : [
      {
        "_index" : "index22",
        "_type" : "_doc",
        "_id" : "r_QurHEBvLUX24hJph0B",
        "_score" : 1.0,
        "_source" : {
          "empId" : 1,
          "feedbackId" : 1,
          "feedback" : 2
        }
      },
      {
        "_index" : "index22",
        "_type" : "_doc",
        "_id" : "sPQurHEBvLUX24hJ0R3x",
        "_score" : 1.0,
        "_source" : {
          "empId" : 1,
          "feedbackId" : 1,
          "feedback" : 2
        }
      },
      {
        "_index" : "index22",
        "_type" : "_doc",
        "_id" : "sfQurHEBvLUX24hJ5h16",
        "_score" : 1.0,
        "_source" : {
          "empId" : 1,
          "feedbackId" : 2,
          "feedback" : 6
        }
      }
    ]

我已经带走了一名 ID 为 1 的员工和三个反馈(两个重复的 identifierId 1 和一个重复的 identifierId2)。总和为 10,使用不同的总和为 8

查询:

我正在使用 scripted metric aggregation 创建唯一的 feedbackIdentifierI 和反馈值的字典。

  1. "init_script":

Executed prior to any collection of documents. Allows the aggregation to set up any initial state.

已声明哈希表交易

  1. "map_script"

Executed once per document collected Loop through all document and add unique identifierid and corresponding feedback value to dictionary

  1. combine_script

Executed once on each shard after document collection is complete

Return 所有分片的字典

  1. reduce_script

Executed once on the coordinating node after all shards have returned their results

再次遍历从每个分片返回的所有字典并创建一个唯一的字典。循环遍历字典以获得反馈的总和或计数

{
  "size": 0,
  "aggs": {
    "employee": {
      "terms": {
        "field": "empId",
        "size": 10000
      },
      "aggs": {
        "distinct_sum_feedback": {
          "scripted_metric": {
            "init_script": "state.transactions =new Hashtable();",
            "map_script": "if(state.transactions.get(doc.feedbackId)==null){state.transactions.put(doc.feedbackId, doc.feedback.value)}",
            "combine_script": "return state.transactions",
            "reduce_script": "def sum=0;def feedbacks=new Hashtable();for(a in states){for(entry in a.entrySet()){if(feedbacks.get(entry.getKey())==null){feedbacks.put(entry.getKey(),entry.getValue());}}}for(entry in feedbacks.entrySet()){sum+=entry.getValue();}    return sum;"
          }
        },
        "distinct_count_feedback": {
          "cardinality": {
            "field": "feedbackId"
          }
        },
        "distinct_avg_feedback": {
          "bucket_script": {
            "buckets_path": {
              "sum": "distinct_sum_feedback.value",
              "count": "distinct_count_feedback.value"
            },
            "script": "params.sum/params.count"
          }
        }
      }
    },
    "sum_feedback": {
      "sum_bucket": {
        "buckets_path": "employee>distinct_sum_feedback.value"
      }
    },
    "count_feedback": {
      "sum_bucket": {
        "buckets_path": "employee>distinct_count_feedback.value"
      }
    }
  }
}

结果: 用户 1 的非重复计数:2

用户 1 的不同总和:8(重复项为 10)

"aggregations" : {
    "employee" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 1,
          "doc_count" : 3,
          "distinct_count_feedback" : {
            "value" : 2
          },
          "distinct_sum_feedback" : {
            "value" : 8
          },
          "distinct_avg_feedback" : {
            "value" : 4.0
          }
        },
        {
          "key" : 2,
          "doc_count" : 1,
          "distinct_count_feedback" : {
            "value" : 1
          },
          "distinct_sum_feedback" : {
            "value" : 6
          },
          "distinct_avg_feedback" : {
            "value" : 6.0
          }
        }
      ]
    },
    "sum_feedback" : {
      "value" : 14.0
    },
    "count_feedback" : {
      "value" : 3.0
    }
  }

这听起来很像是您应该使用几个桶聚合。我看不出有什么方法可以在一次调用中获取所有这些信息,但我认为肯定有一种方法可以聚合您的数据以获取您需要的信息。

关于bucket aggregations

特别是您可以在员工 ID 和反馈标识符上使用 terms aggregation,让每个员工都进入自己的存储桶。从那里,您可以获得每位员工的反馈数量。

您可以仅对反馈标识符执行类似的存储桶查询,以获取所有记录中的反馈计数。

这里是对 Avg_Bucket aggregation 的引用,您可以使用它来计算数据桶(组)的平均值。

请注意,有一个动态集群设置 (search.max_buckets),最高可达 10,000 个桶。

我认为您可能需要这样的东西,但我没有可用来测试它的索引。也许这可以让您走上正确的轨道,看看如何组合各种聚合:

POST /_search
{
  "size": 0,
  "aggs": {
    "employees": {
      "terms": {
        "field": "employeeId"
      },
      "aggs": {
        "feedbacks": {
          "count": {
            "field": "feedbackId"
          }
        }
      }
    },
    "avg_feedback_score": {
      "avg_bucket": {
        "buckets_path": "employees>feedbacks" 
      }
    }
  }
}