ElasticSearch:如何制作聚合管道?

ElasticSearch: How to make an aggregation pipeline?

想象一下以下用例:

我们在 Stark Airlines 工作,我们的营销团队希望对乘客进行细分,以便为他们提供折扣或礼品卡。他们决定要两组乘客:

  1. 每周至少飞行 3 次的乘客
  2. 至少飞行一次但两周未飞行的乘客

有了这个,他们可以为我们的乘客做不同的营销活动!

因此,在弹性搜索中,我们有一个 trip 索引,代表乘客购买的机票:

{
   "_index" : "trip",
   "_type" : "_doc",
   "_id" : "1",
   "_score" : 1.0,
   "_source" : {
       "total_amount" : 300,
       "trip_date" : "2020/03/24 13:30:00",
       "status" : "completed",
       "passenger" : {
          "id" : 11,
          "name" : "Thiago nunes"
       }
     }
}

trip 索引包含一个 status 字段,该字段可能具有其他值,例如:pendingopencanceled

这意味着我们只能考虑具有 completed 状态的行程(意味着乘客确实旅行了)。

所以,考虑到所有这些......我如何通过弹性搜索获得这两组乘客?

我已经尝试了一段时间,但没有成功。

到目前为止我做了什么:

  1. 我建立了一个查询来获取所有有效行程(状态为 completed 的行程)
GET /trip/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "status": {
              "value": "completed"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "status_viagem": {
      "terms": {
        "field": "status.keyword"
      }
    }
  }
}
  1. 此查询returns以下:
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 200,
      "relation" : "eq"
    },
    "max_score" : 0.18232156,
    "hits" : [...]
  },
  "aggregations" : {
    "status_viagem" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "completed",
          "doc_count" : 200
        }
      ]
    }
  }
}

但我卡住了,想不出下一步。我知道接下来要做的应该是创建包含乘客的桶,然后将它们过滤到代表我们所需数据集的两个桶中。但是我不知道怎么办。

有人可以帮忙吗?

PS.:

  1. 我并不完全需要这是一个单一的查询,只是提示如何构建这样的查询会非常有帮助

  2. 输出应该是乘客 ID

  3. 的数组
  4. 注意:为了简单起见,我缩短了 trip 索引

根据我对你的问题的理解。

我使用 date_histogram 以间隔为周来获取哪一周的乘客集合。只保留那些一周内有三份文件的乘客。这将为您提供一周内旅行三次的所有乘客。

在另一个聚合中,我使用 terms aggregation to get passengers and their last travel date. Using bucket selector 保留了最后一次旅行不超过特定日期的乘客。

映射

{
  "index87" : {
    "mappings" : {
      "properties" : {
        "passengerid" : {
          "type" : "long"
        },
        "passengername" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "status" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "total_amount" : {
          "type" : "long"
        },
        "trip_date" : {
          "type" : "date"
        }
      }
    }
  }
}

查询

{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "status": {
              "value": "completed"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "travel_thrice_week": {
      "date_histogram": {
        "field": "trip_date",
        "interval": "week"
      },
      "aggs": {
        "passenger": {
          "terms": {
            "field": "passengername.keyword",
            "min_doc_count": 3,
            "size": 10
          }
        },
        "select_bucket_with_user": {-->to keep weeks which have a pasenger with thrice 
                                    --> a day travel
          "bucket_selector": {
            "buckets_path": {
              "passenger": "passenger._bucket_count"
            },
            "script": "if(params['passenger']>=1) {return true;} else{ return false;} "
          }
        }
      }
    },
    "not_flown_last_two_week": {
      "terms": {
        "field": "passengername.keyword",
        "size": 10
      },
      "aggs": {
        "last_travel": {
          "max": {
            "field": "trip_date" -->  most recent travel
          }
        },
        "last_travel_before_two_week": {
          "bucket_selector": {
            "buckets_path": {
              "traveldate": "last_travel"
            },
            "script":{
              "source": "if(params['traveldate']< params['date_epoch']) return true; else return false;",
              "params": {
                "date_epoch":1586408336000 --> unix epoc of cutt off date
              }
            }

          }
        }
      }
    }
  }
}

结果:

"aggregations" : {
    "not_flown_last_two_week" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "Thiago nunes",
          "doc_count" : 3,
          "last_travel" : {
            "value" : 1.5851808E12,
            "value_as_string" : "2020-03-26T00:00:00.000Z"
          }
        },
        {
          "key" : "john doe",
          "doc_count" : 1,
          "last_travel" : {
            "value" : 1.5799968E12,
            "value_as_string" : "2020-01-26T00:00:00.000Z"
          }
        }
      ]
    },
    "travel_thrice_week" : {
      "buckets" : [
        {
          "key_as_string" : "2020-03-23T00:00:00.000Z",
          "key" : 1584921600000,
          "doc_count" : 3,
          "passenger" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : "Thiago nunes",
                "doc_count" : 3
              }
            ]
          }
        }
      ]
    }
  }