在 spring data elasticsearch 中,聚合查询不能放在存储库实现中吗?

In spring data elasticsearch, can aggregation queries be put in a respository implementation?

我是第一次使用 spring-boot-elasticsearch。我现在想出了如何使用 elastics java api 来描述我的串行差异管道查询。正如您将在下面看到的,此查询相当大,并且 returns 每个对象的多个存储桶以及每个存储桶之间的序列差异。我在 Spring 数据存储库中看到的搜索示例似乎都在这样的查询注释中拼出了查询的 json 主体:

@Repository
public interface SonarMetricRepository extends ElasticsearchRepository<Article, String> {

    @Query("{\"bool\": {\"must\": {\"match\": {\"authors.name\": \"?0\"}}, \"filter\": {\"term\": {\"tags\": \"?1\" }}}}")
    Page<Article> findByAuthorsNameAndFilteredTagQuery(String name, String tag, Pageable pageable);
}

这对于基本的 CRUD 操作来说似乎很优雅,但是我如何才能将下面的查询放入存储库对象而不需要使用 @Query 的原始查询语法?如果您有一个类似的示例,说明为序列差异查询结果或任何管道聚合构建的模型对象,这也会更有帮助。基本上我想在我的存储库中使用这样的搜索方法

Page<Serial Difference Result Object> getCodeCoverageMetrics(String projectKey, Date start, Date end, String interval, int lag);

我应该提到我想使用这个对象的部分原因是我也会在这里有其他 CRUD 查询,而且我认为它会为我处理分页,所以这很吸引人。

这是我的查询,它显示了 1 周时间段内声纳项目的代码覆盖率之间的序列差异:

        SerialDiffPipelineAggregationBuilder serialDiffPipelineAggregationBuilder =
            PipelineAggregatorBuilders
                    .diff("Percent_Change", "avg_coverage")
                    .lag(1);

    AvgAggregationBuilder averageCoverageAggregationBuilder = AggregationBuilders
            .avg("avg_coverage")
            .field("coverage");

    AggregationBuilder coverageHistoryAggregationBuilder = AggregationBuilders
            .dateHistogram("coverage_history")
            .field("@timestamp")
            .calendarInterval(DateHistogramInterval.WEEK)
            .subAggregation(averageCoverageAggregationBuilder)
            .subAggregation(serialDiffPipelineAggregationBuilder);

    TermsAggregationBuilder sonarProjectKeyAggregationBuilder = AggregationBuilders
            .terms("project_key")
            .field("key.keyword")
            .subAggregation(coverageHistoryAggregationBuilder);

    BoolQueryBuilder searchQuery = new BoolQueryBuilder()
            .filter(matchAllQuery())
            .filter(matchPhraseQuery("name.keyword", "my-sample-sonar-project"))
            .filter(rangeQuery("@timestamp")
                    .format("strict_date_optional_time")
                    .gte("2020-07-08T19:29:12.054Z")
                    .lte("2020-07-15T19:29:12.055Z"));

    // Join query and aggregation together
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
            .query(searchQuery)
            .aggregation(sonarProjectKeyAggregationBuilder);

    SearchRequest searchRequest = new SearchRequest("sonarmetrics").source(searchSourceBuilder);
    SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

好的,如果我没听错,你想向存储库查询添加聚合。这对于 Spring Data Elasticsearch 自动创建的方法是不可能的,但实现起来并不难。

为了向您展示如何做到这一点,我使用了一个更简单的示例,我们在其中定义了一个 Person 实体:

@Document(indexName = "person")
public class Person {

    @Id
    @Nullable
    private Long id;

    @Field(type = FieldType.Text, fielddata = true)
    @Nullable
    private String lastName;

    @Field(type = FieldType.Text, fielddata = true)
    @Nullable
    private String firstName;

    // getter/setter
}

还有对应的仓库:

public interface PersonRepository extends ElasticsearchRepository<Person, Long>{
}

我们现在想扩展这个存储库,以便能够搜索有名字的人,并且 return 为这些人搜索前 10 个姓氏(lastNames 上的术语聚合) ).

首先要做的是定义一个描述你需要的方法的customization repository:

interface PersonCustomRepository {
    SearchPage<Person> findByFirstNameWithLastNameCounts(String firstName, Pageable pageable);
}

我们要传入 Pageable 以便方法 returns 页数据。我们 return SearchPage 对象检查 the documentation on return types 将包含分页信息以及 SearchHits<Person>。这个对象然后有聚合 信息和结果数据。

然后我们更改 PersonRepository 以扩展这个新接口:

public interface PersonRepository extends ElasticsearchRepository<Person, Long>, PersonCustomRepository {
}

当然我们现在需要在一个名为PersonCustomRepositoryImpl的class中提供一个实现(这必须像添加了Impl的接口一样命名):

public class PersonCustomRepositoryImpl implements PersonCustomRepository {

    private final ElasticsearchOperations operations;

    public PersonCustomRepositoryImpl(ElasticsearchOperations operations) { // let Spring inject an operations which we use to do the work
        this.operations = operations;
    }

    @Override
    public SearchPage<Person> findByFirstNameWithLastNameCounts(String firstName, Pageable pageable) {

        Query query = new NativeSearchQueryBuilder()                       // we build a Elasticsearch native query
            .addAggregation(terms("lastNames").field("lastName").size(10)) // add the aggregation
            .withQuery(QueryBuilders.matchQuery("firstName", firstName))   // add the query part
            .withPageable(pageable)                                        // add the requested page
            .build();

        SearchHits<Person> searchHits = operations.search(query, Person.class);  // send it of and get the result

        return SearchHitSupport.searchPageFor(searchHits, pageable);  // convert the result to a SearchPage
    }
}

这就是搜索的实施。现在存储库有这个额外的方法。如何使用?

对于这个演示,我假设我们有一个 REST 控制器,它采用一个名称和 return一对:

  1. 找到的人作为 SearchHit<Person> 个对象的列表
  2. a Map<String, Long> 包含姓氏及其计数

这可以按如下方式实现,注释描述了所做的事情:

@GetMapping("persons/firstNameWithLastNameCounts/{firstName}")
public Pair<List<SearchHit<Person>>, Map<String, Long>> firstNameWithLastNameCounts(@PathVariable("firstName") String firstName) {

    // helper function to get the lastName counts from an Elasticsearch Aggregations
    // Spring Data Elasticsearch does not have functions for that, so we need to know what is coming back
    Function<Aggregations, Map<String, Long>> getLastNameCounts = aggregations -> {
        if (aggregations != null) {
            Aggregation lastNames = aggregations.get("lastNames");
            if (lastNames != null) {
                List<? extends Terms.Bucket> buckets = ((Terms) lastNames).getBuckets();
                if (buckets != null) {
                    return buckets.stream().collect(Collectors.toMap(Terms.Bucket::getKeyAsString, Terms.Bucket::getDocCount));
                }
            }
        }
        return Collections.emptyMap();
    };

    // the parts of the returned object
    Map<String, Long> lastNameCounts = null;
    List<SearchHit<Person>> searchHits = new ArrayList<>();

    // request pages of size 1000
    Pageable pageable = PageRequest.of(0, 1000);
    boolean fetchMore = true;
    while (fetchMore) {
        // call the custom method implementation
        SearchPage<Person> searchPage = personRepository.findByFirstNameWithLastNameCounts(firstName, pageable);

        // get the aggregations on the first call, will be the same on the other pages
        if (lastNameCounts == null) {
            Aggregations aggregations = searchPage.getSearchHits().getAggregations();
            lastNameCounts = getLastNameCounts.apply(aggregations);
        }

        // collect the returned data
        if (searchPage.hasContent()) {
            searchHits.addAll(searchPage.getContent());
        }

        pageable = searchPage.nextPageable();
        fetchMore = searchPage.hasNext();
    }

    // return the collected stuff
    return Pair.of(searchHits, lastNameCounts);
}

我希望这可以让您了解如何实现自定义存储库功能并添加开箱即用未提供的功能。