Elasticsearch 中累积基数聚合的替代解决方案

Alternative solution to Cumulative Cardinality Aggregation in Elasticsearch

我是 运行 一个无法访问 AWS 上 x-packs 的 Elasticsearch 集群,但我仍然想执行 cumulative cardinality aggregation 以确定每日计数我网站的新用户数。

这个问题有替代解决方案吗?

比如我要怎么改造:

GET /user_hits/_search
{
  "size": 0,
  "aggs": {
    "users_per_day": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "distinct_users": {
          "cardinality": {
            "field": "user_id"
          }
        },
        "total_new_users": {
          "cumulative_cardinality": {
            "buckets_path": "distinct_users" 
          }
        }
      }
    }
  }
}

要在没有 cumulative_cardinality 的情况下产生相同的结果?

正是出于这个原因添加了累积基数 -- 以前不容易计算...

不过,与 ElasticSearch 中的几乎所有内容一样,有一个脚本可以帮您完成。这是我的看法。

  1. 设置索引
PUT user_hits
{
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "yyyy-MM-dd"
      },
      "user_id": {
        "type": "keyword"
      }
    }
  }
}
  1. 一天添加 1 个新用户,第二天再添加 2 个,其中一个不严格 'new'。
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-01"}

POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-02"}

POST user_hits/_doc
{"user_id":3,"timestamp":"2020-10-02"}
  1. 使用参数化开始 + 天数模拟日期直方图,相应地对用户进行分组,然后比较天数结果
GET /user_hits/_search
{
  "size": 0,
  "query": {
    "range": {
      "timestamp": {
        "gte": "2020-10-01"
      }
    }
  }, 
  "aggs": {
    "new_users_count_vs_prev_day": {
      "scripted_metric": {
        "init_script": """
          state.by_day_map = [:];
          state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
          state.day_millis = 24 * 60 * 60 * 1000;
          state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
        """,
        "map_script": """
          for (def step = 1; step < params.num_of_days + 1; step++) {
            def timestamp = doc.timestamp.value.millis;
            def user_id = doc['user_id'].value;
            def anchor = state.start_millis + (step * state.day_millis);
            // add a `n__` prefix to more easily sort the resulting map later on
            def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));
            
            if (timestamp <= anchor) {
              if (state.by_day_map.containsKey(anchor_pretty)) {
                state.by_day_map[anchor_pretty].add(user_id);
              } else {
                state.by_day_map[anchor_pretty] = [user_id];
              }
            }
        }
        """,
        "combine_script": """
            List keys=new ArrayList(state.by_day_map.keySet());
            Collections.sort(keys);
          
            def unique_sorted_map = new TreeMap();
            def unique_from_prev_day = [];
            
            for (def key : keys) { 
              def unique_users_per_day = new HashSet(state.by_day_map.get(key));
              
              unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));
              
               // remove the `n__` prefix
               unique_sorted_map.put(key.substring(3), unique_users_per_day.size());
               unique_from_prev_day.addAll(unique_users_per_day);
            }
            return unique_sorted_map
        """,
        "reduce_script": "return states",
        "params": {
          "start_date": "2020-10-01",
          "num_of_days": 5
        }
      }
    }
  }
}

屈服

"aggregations" : {
  "new_users_count_vs_prev_day" : {
    "value" : [
      {
        "2020-10-01" : 1,    <-- 1 new unique user            
        "2020-10-02" : 1,    <-- another new unique user
        "2020-10-03" : 0,
        "2020-10-04" : 0,
        "2020-10-05" : 0
      }
    ]
  }
}

该脚本肯定会很慢,但有一个可能非常有用,优点 -- 您可以将其调整为 return 新用户 ID 的完整列表,而不仅仅是您从累积基数中获得的计数,according to its implementation's author 仅按设计顺序、累积方式工作。