根据 Elastic Search 中过滤器的最后一个条目分组

Group by based on last entry of filter in Elastic Search

我有一个类似下面的场景:

包含商店购买商品的索引,其中每个商品都有一个 order_id。 我只需要按每个订单的 last 项的颜色进行分组。

数据结构:

{
    "order_id": 1,
    "product_id":235233
    "color": "Blue",
    "purchase_date": "2020-08-21T05:53:43.362Z"
},
{
    "order_id": 1,
    "product_id":2352662
    "color": "Black",
    "purchase_date": "2020-08-23T05:53:43.362Z"
},
{
    "order_id": 2,
    "product_id":855477
    "color": "Blue",
    "purchase_date": "2020-08-22T05:53:43.362Z"
},
{
    "order_id": 2,
    "product_id":322352
    "color": "Red",
    "purchase_date": "2020-08-24T05:53:43.362Z"
},
{
    "order_id": 3,
    "product_id":3225235
    "color": "Red",
    "purchase_date": "2020-08-25T05:53:43.362Z"
}

预期结果

黑色:1(order_id1最后一个产品的颜色)

Red:2(order_id2、3的最后一个产品的颜色)

基于,我可以将每个订单的最后一件商品作为整件商品获取,但我正在寻找的是直接获取每种颜色的商品数量

POST /items/_search?search_type=count
{
    "aggs": {
        "group": {
            "terms": {
                "field": "order_id"
            },
            "aggs": {
                "group_items": {
                    "top_hits": {
                        "size": 1,
                          "sort": [
                            {
                                "purchase_date": {
                                    "order": "desc"
                                }
                            }
                        ]
                    }
                }
            }
        }
    }
}

下面给出了所有订单项目的每种颜色的项目计数,而不仅仅是每个订单的最后一个项目。

GET /items/_search?search_type=count
{
 "size":0,
  "aggs": {
    "colors": {
       "terms": {
        "field": "color.keyword"
        }
     }
  }
}

您可以按 color 分组并按 purchase_date 的最大值排序,如下所示:

{
  "size": 0,
  "aggs": {
    "group": {
      "terms": {
        "field": "color.keyword",
        "order": {
          "by_latest_purchase": "desc"
        }
      },
      "aggs": {
        "by_latest_purchase": {
          "max": {
            "field": "purchase_date"
          }
        }
      }
    }
  }
}

但您最终还是会得到 blue b/c 它是您文档中存在的一种颜色,我不知道它是否可以被过滤掉。


如有疑问(或所有其他方法都失败),scripted metric aggregations 进行救援:

{
  "size": 0, 
  "aggs": {
    "by_color": {
      "scripted_metric": {
        "init_script": "state.by_order_id = [:]",
        "map_script": """
          def color = doc['color.keyword'].value;
          def date = doc['purchase_date'].value.millis;
          def order_id = doc['order_id'].value;
          
          def current_group = ['color':color, 'date': date];
          
          if (state.by_order_id.containsKey(order_id)) {
            def max_group = state.by_order_id[order_id];
            if (date > max_group.date) {
              // we've found a new maximum
              state.by_order_id[order_id] = current_group
            }
          } else {
            state.by_order_id[order_id] = current_group;
          }
        """,
        "combine_script": """
          def colors_vs_count = [:];
          
          for (def group : state.by_order_id.entrySet()) {
            def order_id = group.getKey();
            def color = group.getValue()['color'];
            if (colors_vs_count.containsKey(color)) {
              colors_vs_count[color]++;
            } else {
              colors_vs_count[color] = 1;
            }
          }
          
          return colors_vs_count;
        """,
        "reduce_script": "return states"
      }
    }
  }
}

产量:

...
"aggregations" : {
  "by_color" : {
    "value" : [
      {
        "Red" : 2,
        "Black" : 1
      }
    ]
  }
}

这是一个 JSON 友好的精简版脚本:

{"size":0,"aggs":{"by_color":{"scripted_metric":{"init_script":"state.by_order_id = [:]","map_script":"          def color = doc['color.keyword'].value;\n          def date = doc['purchase_date'].value.millis;\n          def order_id = doc['order_id'].value;\n          \n          def current_group = ['color':color, 'date': date];\n          \n          if (state.by_order_id.containsKey(order_id)) {\n            def max_group = state.by_order_id[order_id];\n            if (date > max_group.date) {\n              state.by_order_id[order_id] = current_group\n            }\n          } else {\n            state.by_order_id[order_id] = current_group;\n          }","combine_script":"          def colors_vs_count = [:];\n          \n          for (def group : state.by_order_id.entrySet()) {\n            def order_id = group.getKey();\n            def color = group.getValue()['color'];\n            if (colors_vs_count.containsKey(color)) {\n              colors_vs_count[color]++;\n            } else {\n              colors_vs_count[color] = 1;\n            }\n          }\n          \n          return colors_vs_count;","reduce_script":"return states"}}}}

解决该问题的另一种方法是创建和维护一个单独的索引 (latest_by_order),以跟踪每个订单的最新文档。 这可以使用转换 (see docs).

来实现

可以使用以下命令创建此类转换:

PUT _transform/latest_by_order
{
  "source": {
    "index": "items"
  },
  "dest": {
    "index": "latest_by_order"
  },
  "latest": {
    "unique_key": ["order_id"],
    "sort": "purchase_date"
  }
}

然后,可以在新的(转换后的)索引之上进行二次分析。 以下请求:

GET latest_by_order/_search
{
  "size": 0,
  "aggs": {
    "count_by_color": {
      "terms": {
        "field": "color.keyword"
      }
    }
  }
}

将产生以下响应:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "count_by_color" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "Red",
          "doc_count" : 2
        },
        {
          "key" : "Black",
          "doc_count" : 1
        }
      ]
    }
  }
}