如何在 Elasticsearch 中获取超过 10K logs/results

How to get more than 10K logs/results in Elasticsearch

如果我有超过 10K logs/results 的最新版本的 Elasticsearch (7.13),我该如何获取所有日志?我正在阅读 scroll search results 但一开始它显示:

We no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging through more than 10,000 hits, use the search_after parameter with a point in time (PIT).

但是使用 search_after it says you can access more than 10,000 hits but you would need to use a point in time api 以获得 PIT(时间点)ID,然后将该 ID 传递给 search_after 参数。在 kibana CLI 中,如果您输入 cmd POST /YOUR PATTERN INDEX NAME*/_pit?keep_alive=1m,它将 return 该 PIT ID。但是您将如何在 .net 客户端的 NEST 中执行该命令?

This 只告诉你如果你已经有 PIT ID 该怎么办,但没有告诉你如何执行 post 命令来获取 PIT ID?有没有办法不用去 Kibana -> Discover -> CLI 和 运行 命令 POST /customer-simulation-es-app-logs*/_pit?keep_alive=1m (customer-sim 是我索引的名称)


在实施 Rob 的示例之前,我有以下内容:

[HttpGet("GetMonthlyLogs")]
        public async Task<List<EsSource>> GetLogsByDate()
        {
    
            string indexName = "customer-simulation-es-app-logs*";
            var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
            connectionSettings.DefaultIndex(indexName);
            connectionSettings.EnableDebugMode();
            _elasticClient = new ElasticClient(connectionSettings);

            // this will return the number of results in the index based on the criteria below:
            var responseHits = _elasticClient.Count<EsSource>(c => c
                 .Query(q => q
                     .Bool(b => b
                         .Should(
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Error")),
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Information")))
                         .Filter(f => f.DateRange(dr => dr
                         .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                         .MinimumShouldMatch(1)))).Count;

            var response = await _elasticClient.SearchAsync<EsSource>(s => s
                  .Size(3000) // must see about this
                  .Source(src => src.Includes(i => i
                                    .Fields(f => f.timestamp,
                                            f => f.level,
                                            f => f.messageTemplate,
                                            f => f.message)))
                  .Index("customer-simulation-es-app-logs*")
                  .Query(q => q
                      .Bool(b => b
                          .Should(
                                m => m
                                .Match(ma => ma
                                    .Field(fa => fa.level)
                                    .Query("Error")),
                                m => m
                                .Match(ma => ma
                                    .Field(fa => fa.level)
                                    .Query("Information")))
                          .Filter(f => f.DateRange(dr => dr
                          .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                          .MinimumShouldMatch(1))));


            return response?.Documents.ToList();
        }

public class EsSource
{
       [Date(Name = "@timestamp")]
        public DateTimeOffset timestamp { get; set; }
        public String level { get; set; }
        public String messageTemplate { get; set; }
        public String message { get; set; }

}

我试图尝试一下 Rob 的示例实现,我所做的如下。但是,我的问题是,如果我没有 EsDocuments 中的“ID”,我可以使用 timestamp 代替吗? foreach 是必需的,因为这会将结果分组为 1000,对吗?我还可以按时间戳排序还是必须严格按照结果 ID 排序?由于我没有 ID,我正在考虑制作另一个使用 searchapi 的 var searchResponse,但随后创建一个名为 EsID 的通用变量,这样我就可以在点击中循环搜索响应,例如 foreach(var item in searchResponse.Hits(){ EsID = item.Id } 然后将其用于具有批处理 (batches.Select(x => EsID)) 的 foreach 并将其用于排序。但我觉得那会是重复的代码...如果我错了请纠正我?

请在此处查看我的实现:

private IElasticClient _elasticClient;

[HttpGet("GetMonthlyLogs")]
       public async Task<List<EsSource>> GetLogsByDate()
       {
           
            string indexName = "customer-simulation-es-app-logs*";
            var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
            connectionSettings.DefaultIndex(indexName);
            connectionSettings.EnableDebugMode();
            _elasticClient = new ElasticClient(connectionSettings);

             // this will return the number of results in the index based on the criteria's 
            var responseHits = _elasticClient.Count<EsSource>(c => c
                 .Query(q => q
                     .Bool(b => b
                         .Should(
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Error")),
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Information")))
                         .Filter(f => f.DateRange(dr => dr
                         .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                         .MinimumShouldMatch(1)))).Count;

           

            foreach (var batches in Enumerable.Range(0, (int)responseHits).Batch(1000))
            {
                var bulk = await _elasticClient.IndexManyAsync(batches.Select(x => new EsSource { /* can I use timestamp?? */}));
            }

            await _elasticClient.Indices.RefreshAsync();

            var openPit = await _elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
            var pit = openPit.Id;

            var searchAfter = 0;

            try
            {
                while (true)
                {
                    var response = await _elasticClient.SearchAsync<EsSource>(s => s
                          .TrackTotalHits(false) // disable the tracking of total hits to speed up pagination
                          .Size(1000)
                          // pass pit id & extend lifetime of it by another minute
                          .PointInTime(pit, d => d.KeepAlive("1m"))
                          .Source(src => src.Includes(i => i
                                              .Fields(f => f.timestamp,
                                                      f => f.level,
                                                      f => f.messageTemplate,
                                                      f => f.message)))
                          .Query(q => q
                              .Bool(b => b
                                  .Should(
                                          m => m
                                          .Match(ma => ma
                                              .Field(fa => fa.level)
                                              .Query("Error")),
                                          m => m
                                          .Match(ma => ma
                                              .Field(fa => fa.level)
                                              .Query("Information")))
                                  .Filter(f => f.DateRange(dr => dr
                                  .Field("@timestamp")
                                      .GreaterThanOrEquals("2021-07-14T00:00:00.000-05:00")
                                      .LessThanOrEquals("2021-07-14T23:59:59.999-05:00")))
                                  .MinimumShouldMatch(1)))
                          // can I assort with timestamp or does it have to be the result ID?
                          .Sort(srt => srt.Ascending(f => f.timestamp))
                          .SearchAfter(searchAfter));

                    if (response.Documents.Count == 0)
                    {
                        break;
                    }


                    //searchAfter = response.Documents.LastOrDefault()?.timestamp ?? x;
                }
            }
            finally
            {
                // closing the pit
                var closePit = await _elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
            }

            return // response?.Documents.ToList();
        }


    public class EsSource
    {
        [Date(Name = "@timestamp")]
        public DateTimeOffset timestamp { get; set; }
        public String level { get; set; }
        public String messageTemplate { get; set; }
        public String message { get; set; }        
    }

您需要将 PointInTime 个实例添加到您的搜索查询中,如下所示:

esQuery.PointInTime = new PointInTime(PointInTimeId,KeepAlive);

您对 ES 的第一个请求 PointInTimeId 将为空,有关详细信息,请查看 ES 官方文档 here

我准备了一个带有注释的示例应用程序,演示了如何使用 PIT 从索引中检索所有文档并在之后进行搜索。

class Program
{
    static async Task Main(string[] args)
    {
        string indexName = "test";
        var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
        connectionSettings.DefaultIndex(indexName);
        connectionSettings.EnableDebugMode();
        var elasticClient = new ElasticClient(connectionSettings);

        await elasticClient.Indices.DeleteAsync(indexName);
        var indexResponse = await elasticClient.Indices.CreateAsync(indexName);

        // index some test data
        // Batch coming from morelinq nuget
        Console.WriteLine($"Index some data into index");
        foreach (var batches in Enumerable.Range(0, 20000).Batch(1000))
        {
            var bulk = await elasticClient.IndexManyAsync(batches.Select(x => new EsDocument {Id = x }));
        }

        await elasticClient.Indices.RefreshAsync();

        var countResponse = await elasticClient.CountAsync<EsDocument>(d => d);
        Console.WriteLine($"Documents in index: {countResponse.Count}");

        Console.WriteLine($"Open new pit");
        var openPit = await elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
        var pit = openPit.Id;

        Console.WriteLine($"Read all docs from index ..");
        // we will start reading docs from the beginning
        var searchAfter = 0;
        try
        {
            while (true)
            {
                var searchResponse = await elasticClient.SearchAsync<EsDocument>(s => s
                    // disable the tracking of total hits to speed up pagination.
                    .TrackTotalHits(false)
                    .Size(1000)
                    // pass pit id and extend lifetime of it by another minute
                    .PointInTime(pit, d => d.KeepAlive("1m"))
                    .Query(q => q.MatchAll())
                    // sort by Id filed so we can pass last retrieved id to next search
                    .Sort(sort => sort.Ascending(f => f.Id))
                    // pass last id we received from prev. search request so we can keep retrieving more documents
                    .SearchAfter(searchAfter));

                // if we didn't receive any docs just stop processing
                if (searchResponse.Documents.Count == 0)
                {
                    break;
                }

                Console.WriteLine(
                    $"Id [{searchResponse.Documents.FirstOrDefault()?.Id}..{searchResponse.Documents.LastOrDefault()?.Id}]");
                searchAfter = searchResponse.Documents.LastOrDefault()?.Id ?? 0;
            }
        }
        finally
        {
            Console.WriteLine($"Close pit");
            var closePit = await elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
        }
    }

    class EsDocument
    {
        public int Id { get; set; }
    }
}

版画

Index some data into index
Documents in index: 20000
Open new pit
Read all docs from index ..
Id [1..1000]
Id [1001..2000]
Id [2001..3000]
Id [3001..4000]
Id [4001..5000]
Id [5001..6000]
Id [6001..7000]
Id [7001..8000]
Id [8001..9000]
Id [9001..10000]
Id [10001..11000]
Id [11001..12000]
Id [12001..13000]
Id [13001..14000]
Id [14001..15000]
Id [15001..16000]
Id [16001..17000]
Id [17001..18000]
Id [18001..19000]
Id [19001..19999]
Close pit