ElasticSearch:如何制作聚合管道?
ElasticSearch: How to make an aggregation pipeline?
想象一下以下用例:
我们在 Stark Airlines 工作,我们的营销团队希望对乘客进行细分,以便为他们提供折扣或礼品卡。他们决定要两组乘客:
- 每周至少飞行 3 次的乘客
- 至少飞行一次但两周未飞行的乘客
有了这个,他们可以为我们的乘客做不同的营销活动!
因此,在弹性搜索中,我们有一个 trip
索引,代表乘客购买的机票:
{
"_index" : "trip",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"total_amount" : 300,
"trip_date" : "2020/03/24 13:30:00",
"status" : "completed",
"passenger" : {
"id" : 11,
"name" : "Thiago nunes"
}
}
}
trip
索引包含一个 status
字段,该字段可能具有其他值,例如:pending
或 open
或 canceled
这意味着我们只能考虑具有 completed
状态的行程(意味着乘客确实旅行了)。
所以,考虑到所有这些......我如何通过弹性搜索获得这两组乘客?
我已经尝试了一段时间,但没有成功。
到目前为止我做了什么:
- 我建立了一个查询来获取所有有效行程(状态为
completed
的行程)
GET /trip/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"status": {
"value": "completed"
}
}
}
]
}
},
"aggs": {
"status_viagem": {
"terms": {
"field": "status.keyword"
}
}
}
}
- 此查询returns以下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 200,
"relation" : "eq"
},
"max_score" : 0.18232156,
"hits" : [...]
},
"aggregations" : {
"status_viagem" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "completed",
"doc_count" : 200
}
]
}
}
}
但我卡住了,想不出下一步。我知道接下来要做的应该是创建包含乘客的桶,然后将它们过滤到代表我们所需数据集的两个桶中。但是我不知道怎么办。
有人可以帮忙吗?
PS.:
我并不完全需要这是一个单一的查询,只是提示如何构建这样的查询会非常有帮助
输出应该是乘客 ID
的数组
注意:为了简单起见,我缩短了 trip
索引
根据我对你的问题的理解。
我使用 date_histogram 以间隔为周来获取哪一周的乘客集合。只保留那些一周内有三份文件的乘客。这将为您提供一周内旅行三次的所有乘客。
在另一个聚合中,我使用 terms aggregation to get passengers and their last travel date. Using bucket selector 保留了最后一次旅行不超过特定日期的乘客。
映射
{
"index87" : {
"mappings" : {
"properties" : {
"passengerid" : {
"type" : "long"
},
"passengername" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"status" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"total_amount" : {
"type" : "long"
},
"trip_date" : {
"type" : "date"
}
}
}
}
}
查询
{
"query": {
"bool": {
"must": [
{
"term": {
"status": {
"value": "completed"
}
}
}
]
}
},
"aggs": {
"travel_thrice_week": {
"date_histogram": {
"field": "trip_date",
"interval": "week"
},
"aggs": {
"passenger": {
"terms": {
"field": "passengername.keyword",
"min_doc_count": 3,
"size": 10
}
},
"select_bucket_with_user": {-->to keep weeks which have a pasenger with thrice
--> a day travel
"bucket_selector": {
"buckets_path": {
"passenger": "passenger._bucket_count"
},
"script": "if(params['passenger']>=1) {return true;} else{ return false;} "
}
}
}
},
"not_flown_last_two_week": {
"terms": {
"field": "passengername.keyword",
"size": 10
},
"aggs": {
"last_travel": {
"max": {
"field": "trip_date" --> most recent travel
}
},
"last_travel_before_two_week": {
"bucket_selector": {
"buckets_path": {
"traveldate": "last_travel"
},
"script":{
"source": "if(params['traveldate']< params['date_epoch']) return true; else return false;",
"params": {
"date_epoch":1586408336000 --> unix epoc of cutt off date
}
}
}
}
}
}
}
}
结果:
"aggregations" : {
"not_flown_last_two_week" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Thiago nunes",
"doc_count" : 3,
"last_travel" : {
"value" : 1.5851808E12,
"value_as_string" : "2020-03-26T00:00:00.000Z"
}
},
{
"key" : "john doe",
"doc_count" : 1,
"last_travel" : {
"value" : 1.5799968E12,
"value_as_string" : "2020-01-26T00:00:00.000Z"
}
}
]
},
"travel_thrice_week" : {
"buckets" : [
{
"key_as_string" : "2020-03-23T00:00:00.000Z",
"key" : 1584921600000,
"doc_count" : 3,
"passenger" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Thiago nunes",
"doc_count" : 3
}
]
}
}
]
}
}
想象一下以下用例:
我们在 Stark Airlines 工作,我们的营销团队希望对乘客进行细分,以便为他们提供折扣或礼品卡。他们决定要两组乘客:
- 每周至少飞行 3 次的乘客
- 至少飞行一次但两周未飞行的乘客
有了这个,他们可以为我们的乘客做不同的营销活动!
因此,在弹性搜索中,我们有一个 trip
索引,代表乘客购买的机票:
{
"_index" : "trip",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"total_amount" : 300,
"trip_date" : "2020/03/24 13:30:00",
"status" : "completed",
"passenger" : {
"id" : 11,
"name" : "Thiago nunes"
}
}
}
trip
索引包含一个 status
字段,该字段可能具有其他值,例如:pending
或 open
或 canceled
这意味着我们只能考虑具有 completed
状态的行程(意味着乘客确实旅行了)。
所以,考虑到所有这些......我如何通过弹性搜索获得这两组乘客?
我已经尝试了一段时间,但没有成功。
到目前为止我做了什么:
- 我建立了一个查询来获取所有有效行程(状态为
completed
的行程)
GET /trip/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"status": {
"value": "completed"
}
}
}
]
}
},
"aggs": {
"status_viagem": {
"terms": {
"field": "status.keyword"
}
}
}
}
- 此查询returns以下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 200,
"relation" : "eq"
},
"max_score" : 0.18232156,
"hits" : [...]
},
"aggregations" : {
"status_viagem" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "completed",
"doc_count" : 200
}
]
}
}
}
但我卡住了,想不出下一步。我知道接下来要做的应该是创建包含乘客的桶,然后将它们过滤到代表我们所需数据集的两个桶中。但是我不知道怎么办。
有人可以帮忙吗?
PS.:
我并不完全需要这是一个单一的查询,只是提示如何构建这样的查询会非常有帮助
输出应该是乘客 ID
的数组
注意:为了简单起见,我缩短了
trip
索引
根据我对你的问题的理解。
我使用 date_histogram 以间隔为周来获取哪一周的乘客集合。只保留那些一周内有三份文件的乘客。这将为您提供一周内旅行三次的所有乘客。
在另一个聚合中,我使用 terms aggregation to get passengers and their last travel date. Using bucket selector 保留了最后一次旅行不超过特定日期的乘客。
映射
{
"index87" : {
"mappings" : {
"properties" : {
"passengerid" : {
"type" : "long"
},
"passengername" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"status" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"total_amount" : {
"type" : "long"
},
"trip_date" : {
"type" : "date"
}
}
}
}
}
查询
{
"query": {
"bool": {
"must": [
{
"term": {
"status": {
"value": "completed"
}
}
}
]
}
},
"aggs": {
"travel_thrice_week": {
"date_histogram": {
"field": "trip_date",
"interval": "week"
},
"aggs": {
"passenger": {
"terms": {
"field": "passengername.keyword",
"min_doc_count": 3,
"size": 10
}
},
"select_bucket_with_user": {-->to keep weeks which have a pasenger with thrice
--> a day travel
"bucket_selector": {
"buckets_path": {
"passenger": "passenger._bucket_count"
},
"script": "if(params['passenger']>=1) {return true;} else{ return false;} "
}
}
}
},
"not_flown_last_two_week": {
"terms": {
"field": "passengername.keyword",
"size": 10
},
"aggs": {
"last_travel": {
"max": {
"field": "trip_date" --> most recent travel
}
},
"last_travel_before_two_week": {
"bucket_selector": {
"buckets_path": {
"traveldate": "last_travel"
},
"script":{
"source": "if(params['traveldate']< params['date_epoch']) return true; else return false;",
"params": {
"date_epoch":1586408336000 --> unix epoc of cutt off date
}
}
}
}
}
}
}
}
结果:
"aggregations" : {
"not_flown_last_two_week" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Thiago nunes",
"doc_count" : 3,
"last_travel" : {
"value" : 1.5851808E12,
"value_as_string" : "2020-03-26T00:00:00.000Z"
}
},
{
"key" : "john doe",
"doc_count" : 1,
"last_travel" : {
"value" : 1.5799968E12,
"value_as_string" : "2020-01-26T00:00:00.000Z"
}
}
]
},
"travel_thrice_week" : {
"buckets" : [
{
"key_as_string" : "2020-03-23T00:00:00.000Z",
"key" : 1584921600000,
"doc_count" : 3,
"passenger" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "Thiago nunes",
"doc_count" : 3
}
]
}
}
]
}
}