弹性搜索嵌套。更好的术语聚合及其迭代代码

ElasticSearch Nest. better code for terms aggregation and its iteration

我想获取给定时间段内唯一数字用户 ID 的列表。

假设字段是userId,时间字段是startTime,我成功得到如下结果;

HashSet<int> hashUserIdList= new HashSet<int>(); // guarantees to store unique userIds.

// Step 1. get unique number of userIds
var total = client.Search<Log>(s => s
    .Query(q => q
        .DateRange(c => c.Field(p => p.startTime)
        .GreaterThan(FixedDate)))
        .Aggregations(a => a
            .Cardinality("userId_cardinality", c => c
                .Field("userId"))))
    .Aggs.Cardinality("userId_cardinality");

int totalCount = (int)total.Value;

// Step 2. get unique userId values by Terms aggregation.
var response = client.Search<Log>(s => s
    .Source(source => source.Includes(inc => inc.Field("userId")))
    .Query(q => q
        .DateRange(c => c.Field(p => p.startTime)
        .GreaterThan(FixedDate)))
    .Aggregations(a => a
        .Terms("userId_terms", c => c
            .Field("userId").Size(totalCount))))
    .Aggs.Terms("userId_terms");

// Step 3. store unique userIds to HashSet.
foreach (var element in response.Buckets)
{
    hashUserIdList.Add(int.Parse(element.Key));
}

有效 但似乎效率不高,因为 (1) 它首先获取 totalCount,并且 (2) 它定义了 Size(totalCount) 可以创建 500 个服务器由于桶溢出而导致的错误(如果结果有数千怎么办)。

foreach 的方式进行迭代会很好,但我未能使它们按大小 100 进行迭代。我在这里和那里放了 From/SizeSkip/Take 但返回的值不可靠。

如何正确编码?

这种方法可能适用于某些集合,但有几点观察:

  1. Cardinality Aggregation uses HyperLogLog++算法到近似基数;此近似值对于低基数字段可能完全准确,但对于高基数字段则不太准确。
  2. 对于许多 个术语,术语聚合可能在计算上很昂贵,因为每个桶都需要在内存中构建,然后序列化以响应。

您可以跳过 Cardinality Aggregation 来获取大小,只需将 int.MaxValue 作为 Terms 聚合的大小传递。另一种在速度方面效率较低的替代方法是滚动浏览范围内的所有文档,源过滤器仅 return 您感兴趣的字段。我希望滚动方法减少集群上的压力,但我建议监控您采取的任何方法。

以下是 Stack Overflow 数据集(2016 年 6 月,IIRC)上两种方法的比较,查看 2 年前的今天和一年前的今天之间的独特问题提问者。

词条聚合

void Main()
{
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var connectionSettings = new ConnectionSettings(pool)
        .MapDefaultTypeIndices(d => d
            .Add(typeof(Question), NDC.WhosebugIndex)
        );


    var client = new ElasticClient(connectionSettings);

    var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
    var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

    var searchResponse = client.Search<Question>(s => s
        .Size(0)
        .Query(q => q
            .DateRange(c => c.Field(p => p.CreationDate)
                .GreaterThan(twoYearsAgo)
                .LessThan(yearAgo)
            )
        )
        .Aggregations(a => a
            .Terms("unique_users", c => c
                .Field(f => f.OwnerUserId)
                .Size(int.MaxValue)
            )
        )
    );

    var uniqueOwnerUserIds = searchResponse.Aggs.Terms("unique_users").Buckets.Select(b => b.KeyAsString).ToList();

    // 3.83 seconds
    // unique question askers: 795352
    Console.WriteLine($"unique question askers: {uniqueOwnerUserIds.Count}");
}

滚动API

void Main()
{
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var connectionSettings = new ConnectionSettings(pool)
        .MapDefaultTypeIndices(d => d
            .Add(typeof(Question), NDC.WhosebugIndex)
        );

    var client = new ElasticClient(connectionSettings);
    var uniqueOwnerUserIds = new HashSet<int>();

    var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
    var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

    var searchResponse = client.Search<Question>(s => s
        .Source(sf => sf
            .Include(ff => ff
                .Field(f => f.OwnerUserId)
            )
        )
        .Size(10000)
        .Scroll("1m")
        .Query(q => q
            .DateRange(c => c
                .Field(p => p.CreationDate)
                .GreaterThan(twoYearsAgo)
                .LessThan(yearAgo)
            )
        )
    );

    while (searchResponse.Documents.Any())
    {
        foreach (var document in searchResponse.Documents)
        {
            if (document.OwnerUserId.HasValue)
                uniqueOwnerUserIds.Add(document.OwnerUserId.Value);
        }

        searchResponse = client.Scroll<Question>("1m", searchResponse.ScrollId);
    }

    client.ClearScroll(c => c.ScrollId(searchResponse.ScrollId));

    // 91.8 seconds
    // unique question askers: 795352
    Console.WriteLine($"unique question askers: {uniqueOwnerUserIds.Count}");
}

术语聚合比滚动 API 方法快约 24 倍。