Elasticsearch 中累积基数聚合的替代解决方案
Alternative solution to Cumulative Cardinality Aggregation in Elasticsearch
我是 运行 一个无法访问 AWS 上 x-packs
的 Elasticsearch 集群,但我仍然想执行 cumulative cardinality aggregation
以确定每日计数我网站的新用户数。
这个问题有替代解决方案吗?
比如我要怎么改造:
GET /user_hits/_search
{
"size": 0,
"aggs": {
"users_per_day": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "day"
},
"aggs": {
"distinct_users": {
"cardinality": {
"field": "user_id"
}
},
"total_new_users": {
"cumulative_cardinality": {
"buckets_path": "distinct_users"
}
}
}
}
}
}
要在没有 cumulative_cardinality
的情况下产生相同的结果?
正是出于这个原因添加了累积基数 -- 以前不容易计算...
不过,与 ElasticSearch 中的几乎所有内容一样,有一个脚本可以帮您完成。这是我的看法。
- 设置索引
PUT user_hits
{
"mappings": {
"properties": {
"timestamp": {
"type": "date",
"format": "yyyy-MM-dd"
},
"user_id": {
"type": "keyword"
}
}
}
}
- 一天添加 1 个新用户,第二天再添加 2 个,其中一个不严格 'new'。
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-01"}
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-02"}
POST user_hits/_doc
{"user_id":3,"timestamp":"2020-10-02"}
- 使用参数化开始 + 天数模拟日期直方图,相应地对用户进行分组,然后比较天数结果
GET /user_hits/_search
{
"size": 0,
"query": {
"range": {
"timestamp": {
"gte": "2020-10-01"
}
}
},
"aggs": {
"new_users_count_vs_prev_day": {
"scripted_metric": {
"init_script": """
state.by_day_map = [:];
state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
state.day_millis = 24 * 60 * 60 * 1000;
state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
""",
"map_script": """
for (def step = 1; step < params.num_of_days + 1; step++) {
def timestamp = doc.timestamp.value.millis;
def user_id = doc['user_id'].value;
def anchor = state.start_millis + (step * state.day_millis);
// add a `n__` prefix to more easily sort the resulting map later on
def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));
if (timestamp <= anchor) {
if (state.by_day_map.containsKey(anchor_pretty)) {
state.by_day_map[anchor_pretty].add(user_id);
} else {
state.by_day_map[anchor_pretty] = [user_id];
}
}
}
""",
"combine_script": """
List keys=new ArrayList(state.by_day_map.keySet());
Collections.sort(keys);
def unique_sorted_map = new TreeMap();
def unique_from_prev_day = [];
for (def key : keys) {
def unique_users_per_day = new HashSet(state.by_day_map.get(key));
unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));
// remove the `n__` prefix
unique_sorted_map.put(key.substring(3), unique_users_per_day.size());
unique_from_prev_day.addAll(unique_users_per_day);
}
return unique_sorted_map
""",
"reduce_script": "return states",
"params": {
"start_date": "2020-10-01",
"num_of_days": 5
}
}
}
}
}
屈服
"aggregations" : {
"new_users_count_vs_prev_day" : {
"value" : [
{
"2020-10-01" : 1, <-- 1 new unique user
"2020-10-02" : 1, <-- another new unique user
"2020-10-03" : 0,
"2020-10-04" : 0,
"2020-10-05" : 0
}
]
}
}
该脚本肯定会很慢,但有一个可能非常有用,优点 -- 您可以将其调整为 return 新用户 ID 的完整列表,而不仅仅是您从累积基数中获得的计数,according to its implementation's author 仅按设计顺序、累积方式工作。
我是 运行 一个无法访问 AWS 上 x-packs
的 Elasticsearch 集群,但我仍然想执行 cumulative cardinality aggregation
以确定每日计数我网站的新用户数。
这个问题有替代解决方案吗?
比如我要怎么改造:
GET /user_hits/_search
{
"size": 0,
"aggs": {
"users_per_day": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "day"
},
"aggs": {
"distinct_users": {
"cardinality": {
"field": "user_id"
}
},
"total_new_users": {
"cumulative_cardinality": {
"buckets_path": "distinct_users"
}
}
}
}
}
}
要在没有 cumulative_cardinality
的情况下产生相同的结果?
正是出于这个原因添加了累积基数 -- 以前不容易计算...
不过,与 ElasticSearch 中的几乎所有内容一样,有一个脚本可以帮您完成。这是我的看法。
- 设置索引
PUT user_hits
{
"mappings": {
"properties": {
"timestamp": {
"type": "date",
"format": "yyyy-MM-dd"
},
"user_id": {
"type": "keyword"
}
}
}
}
- 一天添加 1 个新用户,第二天再添加 2 个,其中一个不严格 'new'。
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-01"}
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-02"}
POST user_hits/_doc
{"user_id":3,"timestamp":"2020-10-02"}
- 使用参数化开始 + 天数模拟日期直方图,相应地对用户进行分组,然后比较天数结果
GET /user_hits/_search
{
"size": 0,
"query": {
"range": {
"timestamp": {
"gte": "2020-10-01"
}
}
},
"aggs": {
"new_users_count_vs_prev_day": {
"scripted_metric": {
"init_script": """
state.by_day_map = [:];
state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
state.day_millis = 24 * 60 * 60 * 1000;
state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
""",
"map_script": """
for (def step = 1; step < params.num_of_days + 1; step++) {
def timestamp = doc.timestamp.value.millis;
def user_id = doc['user_id'].value;
def anchor = state.start_millis + (step * state.day_millis);
// add a `n__` prefix to more easily sort the resulting map later on
def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));
if (timestamp <= anchor) {
if (state.by_day_map.containsKey(anchor_pretty)) {
state.by_day_map[anchor_pretty].add(user_id);
} else {
state.by_day_map[anchor_pretty] = [user_id];
}
}
}
""",
"combine_script": """
List keys=new ArrayList(state.by_day_map.keySet());
Collections.sort(keys);
def unique_sorted_map = new TreeMap();
def unique_from_prev_day = [];
for (def key : keys) {
def unique_users_per_day = new HashSet(state.by_day_map.get(key));
unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));
// remove the `n__` prefix
unique_sorted_map.put(key.substring(3), unique_users_per_day.size());
unique_from_prev_day.addAll(unique_users_per_day);
}
return unique_sorted_map
""",
"reduce_script": "return states",
"params": {
"start_date": "2020-10-01",
"num_of_days": 5
}
}
}
}
}
屈服
"aggregations" : {
"new_users_count_vs_prev_day" : {
"value" : [
{
"2020-10-01" : 1, <-- 1 new unique user
"2020-10-02" : 1, <-- another new unique user
"2020-10-03" : 0,
"2020-10-04" : 0,
"2020-10-05" : 0
}
]
}
}
该脚本肯定会很慢,但有一个可能非常有用,优点 -- 您可以将其调整为 return 新用户 ID 的完整列表,而不仅仅是您从累积基数中获得的计数,according to its implementation's author 仅按设计顺序、累积方式工作。