使用 elastic4s 客户端为 elasticsearch 构建动态聚合查询
Building dynamic aggregation queries for elasticsearch using elastic4s client
是否可以使用 elastic4s DSL 动态地进行多级聚合查询。
使用 http 客户端很简单
multiLevelAggregation
Input: Fields[0..N]
Output: Data grouped by field tuple
Steps:
1. Build multilevel elasticsearch aggregation query (JSON)
2. Execute query on elasticsearch server
3. Flatten result and return
但是如何使用 elastic4s 或 Java 客户端来做到这一点。
在仔细了解我的问题后,我设法找到了解决方案,最初我认为这是 elastic4s 的限制,但事实并非如此,通过 elastic4s 客户端动态构建多字段聚合查询很容易,这是我的解决方案
</p>
<pre><code>//For building aggregation query
def buildAgg(groups: Seq[String])(leafAggBuilder: () => AbstractAggregationDefinition): AbstractAggregationDefinition = {
groups match {
case x :: xs => aggregation.terms("termAgg").field(x).aggregations(buildAgg(xs)(leafAggBuilder))
case Nil => leafAggBuilder()
}
}
//An example leaf aggregation builder
def buildLeafAgg(aggFuncInfo: Pair[String, String])(): AbstractAggregationDefinition = {
aggFuncInfo._1 match {
case "avg" => aggregation.avg("aggFunc").field(aggFuncInfo._2)
case "sum" => aggregation.sum("aggFunc").field(aggFuncInfo._2)
case "cardinality" => aggregation.cardinality("aggFunc").field(aggFuncInfo._2)
case _ => aggregation.count("aggFunc").field(aggFuncInfo._2)
}
}
//For parsing aggregation
def parseAgg[T](groups: Seq[String], agg: Aggregation, allGroups: Seq[String])(leafAggParser: (Seq[String], Aggregation) => Seq[T]): Seq[T] = {
groups match {
case x :: xs => {
val groupAggs = agg.asInstanceOf[StringTerms].getBuckets.asScala.toList
(for {
groupAgg <- groupAggs
aa = groupAgg.getAggregations.asList.asScala.head
gkey = groupAgg.getKeyAsString
gacc = allGroups :+ gkey
} yield parseAgg(xs, aa, gacc)(leafAggParser)).flatten
}
case Nil => {
leafAggParser(allGroups, agg)
}
}
}
//An example leaf aggregation parser
def parseSimpleLeafAgg(allGroups: Seq[String], agg: Aggregation): Seq[GroupStats] = {
val value = agg.asInstanceOf[InternalNumericMetricsAggregation.SingleValue].value()
val groupId = allGroups.mkString(".")
Seq(GroupStats(groupId, value))
}
//Usage: Build Query and Parse result
def groupStats(groupFields: Seq[String]): Seq[GroupStats] = {
val resp = client.execute {
def leafPlainAggBuilder = buildLeafAgg(("count", "V1")) _
search(esIndex).size(0).aggregations(buildAgg(groupFields)(leafPlainAggBuilder))
}.await
//get the root aggregation
val agg = resp.aggregations.asList().asScala.head
def leafAggParser = parseSimpleLeafAgg _
val res = parseAgg(groupFields, agg, Seq())(leafAggParser)
res
}
是否可以使用 elastic4s DSL 动态地进行多级聚合查询。
使用 http 客户端很简单
multiLevelAggregation Input: Fields[0..N] Output: Data grouped by field tuple Steps: 1. Build multilevel elasticsearch aggregation query (JSON) 2. Execute query on elasticsearch server 3. Flatten result and return
但是如何使用 elastic4s 或 Java 客户端来做到这一点。
在仔细了解我的问题后,我设法找到了解决方案,最初我认为这是 elastic4s 的限制,但事实并非如此,通过 elastic4s 客户端动态构建多字段聚合查询很容易,这是我的解决方案
</p>
<pre><code>//For building aggregation query
def buildAgg(groups: Seq[String])(leafAggBuilder: () => AbstractAggregationDefinition): AbstractAggregationDefinition = {
groups match {
case x :: xs => aggregation.terms("termAgg").field(x).aggregations(buildAgg(xs)(leafAggBuilder))
case Nil => leafAggBuilder()
}
}
//An example leaf aggregation builder
def buildLeafAgg(aggFuncInfo: Pair[String, String])(): AbstractAggregationDefinition = {
aggFuncInfo._1 match {
case "avg" => aggregation.avg("aggFunc").field(aggFuncInfo._2)
case "sum" => aggregation.sum("aggFunc").field(aggFuncInfo._2)
case "cardinality" => aggregation.cardinality("aggFunc").field(aggFuncInfo._2)
case _ => aggregation.count("aggFunc").field(aggFuncInfo._2)
}
}
//For parsing aggregation
def parseAgg[T](groups: Seq[String], agg: Aggregation, allGroups: Seq[String])(leafAggParser: (Seq[String], Aggregation) => Seq[T]): Seq[T] = {
groups match {
case x :: xs => {
val groupAggs = agg.asInstanceOf[StringTerms].getBuckets.asScala.toList
(for {
groupAgg <- groupAggs
aa = groupAgg.getAggregations.asList.asScala.head
gkey = groupAgg.getKeyAsString
gacc = allGroups :+ gkey
} yield parseAgg(xs, aa, gacc)(leafAggParser)).flatten
}
case Nil => {
leafAggParser(allGroups, agg)
}
}
}
//An example leaf aggregation parser
def parseSimpleLeafAgg(allGroups: Seq[String], agg: Aggregation): Seq[GroupStats] = {
val value = agg.asInstanceOf[InternalNumericMetricsAggregation.SingleValue].value()
val groupId = allGroups.mkString(".")
Seq(GroupStats(groupId, value))
}
//Usage: Build Query and Parse result
def groupStats(groupFields: Seq[String]): Seq[GroupStats] = {
val resp = client.execute {
def leafPlainAggBuilder = buildLeafAgg(("count", "V1")) _
search(esIndex).size(0).aggregations(buildAgg(groupFields)(leafPlainAggBuilder))
}.await
//get the root aggregation
val agg = resp.aggregations.asList().asScala.head
def leafAggParser = parseSimpleLeafAgg _
val res = parseAgg(groupFields, agg, Seq())(leafAggParser)
res
}