如何在 Elasticsearch 中获取超过 10K logs/results
How to get more than 10K logs/results in Elasticsearch
如果我有超过 10K logs/results 的最新版本的 Elasticsearch (7.13),我该如何获取所有日志?我正在阅读 scroll search results 但一开始它显示:
We no longer recommend using the scroll API for deep pagination. If
you need to preserve the index state while paging through more than
10,000 hits, use the search_after parameter with a point in time
(PIT).
但是使用 search_after it says you can access more than 10,000 hits but you would need to use a point in time api 以获得 PIT(时间点)ID,然后将该 ID 传递给 search_after 参数。在 kibana CLI 中,如果您输入 cmd POST /YOUR PATTERN INDEX NAME*/_pit?keep_alive=1m
,它将 return 该 PIT ID。但是您将如何在 .net 客户端的 NEST 中执行该命令?
This 只告诉你如果你已经有 PIT ID 该怎么办,但没有告诉你如何执行 post 命令来获取 PIT ID?有没有办法不用去 Kibana -> Discover -> CLI 和 运行 命令 POST /customer-simulation-es-app-logs*/_pit?keep_alive=1m
(customer-sim 是我索引的名称)
在实施 Rob 的示例之前,我有以下内容:
[HttpGet("GetMonthlyLogs")]
public async Task<List<EsSource>> GetLogsByDate()
{
string indexName = "customer-simulation-es-app-logs*";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
_elasticClient = new ElasticClient(connectionSettings);
// this will return the number of results in the index based on the criteria below:
var responseHits = _elasticClient.Count<EsSource>(c => c
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1)))).Count;
var response = await _elasticClient.SearchAsync<EsSource>(s => s
.Size(3000) // must see about this
.Source(src => src.Includes(i => i
.Fields(f => f.timestamp,
f => f.level,
f => f.messageTemplate,
f => f.message)))
.Index("customer-simulation-es-app-logs*")
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1))));
return response?.Documents.ToList();
}
public class EsSource
{
[Date(Name = "@timestamp")]
public DateTimeOffset timestamp { get; set; }
public String level { get; set; }
public String messageTemplate { get; set; }
public String message { get; set; }
}
我试图尝试一下 Rob 的示例实现,我所做的如下。但是,我的问题是,如果我没有 EsDocuments 中的“ID”,我可以使用 timestamp
代替吗? foreach
是必需的,因为这会将结果分组为 1000,对吗?我还可以按时间戳排序还是必须严格按照结果 ID 排序?由于我没有 ID
,我正在考虑制作另一个使用 searchapi 的 var searchResponse
,但随后创建一个名为 EsID 的通用变量,这样我就可以在点击中循环搜索响应,例如 foreach(var item in searchResponse.Hits(){ EsID = item.Id }
然后将其用于具有批处理 (batches.Select(x => EsID)
) 的 foreach
并将其用于排序。但我觉得那会是重复的代码...如果我错了请纠正我?
请在此处查看我的实现:
private IElasticClient _elasticClient;
[HttpGet("GetMonthlyLogs")]
public async Task<List<EsSource>> GetLogsByDate()
{
string indexName = "customer-simulation-es-app-logs*";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
_elasticClient = new ElasticClient(connectionSettings);
// this will return the number of results in the index based on the criteria's
var responseHits = _elasticClient.Count<EsSource>(c => c
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1)))).Count;
foreach (var batches in Enumerable.Range(0, (int)responseHits).Batch(1000))
{
var bulk = await _elasticClient.IndexManyAsync(batches.Select(x => new EsSource { /* can I use timestamp?? */}));
}
await _elasticClient.Indices.RefreshAsync();
var openPit = await _elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
var pit = openPit.Id;
var searchAfter = 0;
try
{
while (true)
{
var response = await _elasticClient.SearchAsync<EsSource>(s => s
.TrackTotalHits(false) // disable the tracking of total hits to speed up pagination
.Size(1000)
// pass pit id & extend lifetime of it by another minute
.PointInTime(pit, d => d.KeepAlive("1m"))
.Source(src => src.Includes(i => i
.Fields(f => f.timestamp,
f => f.level,
f => f.messageTemplate,
f => f.message)))
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-14T00:00:00.000-05:00")
.LessThanOrEquals("2021-07-14T23:59:59.999-05:00")))
.MinimumShouldMatch(1)))
// can I assort with timestamp or does it have to be the result ID?
.Sort(srt => srt.Ascending(f => f.timestamp))
.SearchAfter(searchAfter));
if (response.Documents.Count == 0)
{
break;
}
//searchAfter = response.Documents.LastOrDefault()?.timestamp ?? x;
}
}
finally
{
// closing the pit
var closePit = await _elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
}
return // response?.Documents.ToList();
}
public class EsSource
{
[Date(Name = "@timestamp")]
public DateTimeOffset timestamp { get; set; }
public String level { get; set; }
public String messageTemplate { get; set; }
public String message { get; set; }
}
您需要将 PointInTime
个实例添加到您的搜索查询中,如下所示:
esQuery.PointInTime = new PointInTime(PointInTimeId,KeepAlive);
您对 ES 的第一个请求 PointInTimeId
将为空,有关详细信息,请查看 ES 官方文档 here。
我准备了一个带有注释的示例应用程序,演示了如何使用 PIT 从索引中检索所有文档并在之后进行搜索。
class Program
{
static async Task Main(string[] args)
{
string indexName = "test";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
var elasticClient = new ElasticClient(connectionSettings);
await elasticClient.Indices.DeleteAsync(indexName);
var indexResponse = await elasticClient.Indices.CreateAsync(indexName);
// index some test data
// Batch coming from morelinq nuget
Console.WriteLine($"Index some data into index");
foreach (var batches in Enumerable.Range(0, 20000).Batch(1000))
{
var bulk = await elasticClient.IndexManyAsync(batches.Select(x => new EsDocument {Id = x }));
}
await elasticClient.Indices.RefreshAsync();
var countResponse = await elasticClient.CountAsync<EsDocument>(d => d);
Console.WriteLine($"Documents in index: {countResponse.Count}");
Console.WriteLine($"Open new pit");
var openPit = await elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
var pit = openPit.Id;
Console.WriteLine($"Read all docs from index ..");
// we will start reading docs from the beginning
var searchAfter = 0;
try
{
while (true)
{
var searchResponse = await elasticClient.SearchAsync<EsDocument>(s => s
// disable the tracking of total hits to speed up pagination.
.TrackTotalHits(false)
.Size(1000)
// pass pit id and extend lifetime of it by another minute
.PointInTime(pit, d => d.KeepAlive("1m"))
.Query(q => q.MatchAll())
// sort by Id filed so we can pass last retrieved id to next search
.Sort(sort => sort.Ascending(f => f.Id))
// pass last id we received from prev. search request so we can keep retrieving more documents
.SearchAfter(searchAfter));
// if we didn't receive any docs just stop processing
if (searchResponse.Documents.Count == 0)
{
break;
}
Console.WriteLine(
$"Id [{searchResponse.Documents.FirstOrDefault()?.Id}..{searchResponse.Documents.LastOrDefault()?.Id}]");
searchAfter = searchResponse.Documents.LastOrDefault()?.Id ?? 0;
}
}
finally
{
Console.WriteLine($"Close pit");
var closePit = await elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
}
}
class EsDocument
{
public int Id { get; set; }
}
}
版画
Index some data into index
Documents in index: 20000
Open new pit
Read all docs from index ..
Id [1..1000]
Id [1001..2000]
Id [2001..3000]
Id [3001..4000]
Id [4001..5000]
Id [5001..6000]
Id [6001..7000]
Id [7001..8000]
Id [8001..9000]
Id [9001..10000]
Id [10001..11000]
Id [11001..12000]
Id [12001..13000]
Id [13001..14000]
Id [14001..15000]
Id [15001..16000]
Id [16001..17000]
Id [17001..18000]
Id [18001..19000]
Id [19001..19999]
Close pit
如果我有超过 10K logs/results 的最新版本的 Elasticsearch (7.13),我该如何获取所有日志?我正在阅读 scroll search results 但一开始它显示:
We no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging through more than 10,000 hits, use the search_after parameter with a point in time (PIT).
但是使用 search_after it says you can access more than 10,000 hits but you would need to use a point in time api 以获得 PIT(时间点)ID,然后将该 ID 传递给 search_after 参数。在 kibana CLI 中,如果您输入 cmd POST /YOUR PATTERN INDEX NAME*/_pit?keep_alive=1m
,它将 return 该 PIT ID。但是您将如何在 .net 客户端的 NEST 中执行该命令?
This 只告诉你如果你已经有 PIT ID 该怎么办,但没有告诉你如何执行 post 命令来获取 PIT ID?有没有办法不用去 Kibana -> Discover -> CLI 和 运行 命令 POST /customer-simulation-es-app-logs*/_pit?keep_alive=1m
(customer-sim 是我索引的名称)
在实施 Rob 的示例之前,我有以下内容:
[HttpGet("GetMonthlyLogs")]
public async Task<List<EsSource>> GetLogsByDate()
{
string indexName = "customer-simulation-es-app-logs*";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
_elasticClient = new ElasticClient(connectionSettings);
// this will return the number of results in the index based on the criteria below:
var responseHits = _elasticClient.Count<EsSource>(c => c
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1)))).Count;
var response = await _elasticClient.SearchAsync<EsSource>(s => s
.Size(3000) // must see about this
.Source(src => src.Includes(i => i
.Fields(f => f.timestamp,
f => f.level,
f => f.messageTemplate,
f => f.message)))
.Index("customer-simulation-es-app-logs*")
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1))));
return response?.Documents.ToList();
}
public class EsSource
{
[Date(Name = "@timestamp")]
public DateTimeOffset timestamp { get; set; }
public String level { get; set; }
public String messageTemplate { get; set; }
public String message { get; set; }
}
我试图尝试一下 Rob 的示例实现,我所做的如下。但是,我的问题是,如果我没有 EsDocuments 中的“ID”,我可以使用 timestamp
代替吗? foreach
是必需的,因为这会将结果分组为 1000,对吗?我还可以按时间戳排序还是必须严格按照结果 ID 排序?由于我没有 ID
,我正在考虑制作另一个使用 searchapi 的 var searchResponse
,但随后创建一个名为 EsID 的通用变量,这样我就可以在点击中循环搜索响应,例如 foreach(var item in searchResponse.Hits(){ EsID = item.Id }
然后将其用于具有批处理 (batches.Select(x => EsID)
) 的 foreach
并将其用于排序。但我觉得那会是重复的代码...如果我错了请纠正我?
请在此处查看我的实现:
private IElasticClient _elasticClient;
[HttpGet("GetMonthlyLogs")]
public async Task<List<EsSource>> GetLogsByDate()
{
string indexName = "customer-simulation-es-app-logs*";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
_elasticClient = new ElasticClient(connectionSettings);
// this will return the number of results in the index based on the criteria's
var responseHits = _elasticClient.Count<EsSource>(c => c
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1)))).Count;
foreach (var batches in Enumerable.Range(0, (int)responseHits).Batch(1000))
{
var bulk = await _elasticClient.IndexManyAsync(batches.Select(x => new EsSource { /* can I use timestamp?? */}));
}
await _elasticClient.Indices.RefreshAsync();
var openPit = await _elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
var pit = openPit.Id;
var searchAfter = 0;
try
{
while (true)
{
var response = await _elasticClient.SearchAsync<EsSource>(s => s
.TrackTotalHits(false) // disable the tracking of total hits to speed up pagination
.Size(1000)
// pass pit id & extend lifetime of it by another minute
.PointInTime(pit, d => d.KeepAlive("1m"))
.Source(src => src.Includes(i => i
.Fields(f => f.timestamp,
f => f.level,
f => f.messageTemplate,
f => f.message)))
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-14T00:00:00.000-05:00")
.LessThanOrEquals("2021-07-14T23:59:59.999-05:00")))
.MinimumShouldMatch(1)))
// can I assort with timestamp or does it have to be the result ID?
.Sort(srt => srt.Ascending(f => f.timestamp))
.SearchAfter(searchAfter));
if (response.Documents.Count == 0)
{
break;
}
//searchAfter = response.Documents.LastOrDefault()?.timestamp ?? x;
}
}
finally
{
// closing the pit
var closePit = await _elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
}
return // response?.Documents.ToList();
}
public class EsSource
{
[Date(Name = "@timestamp")]
public DateTimeOffset timestamp { get; set; }
public String level { get; set; }
public String messageTemplate { get; set; }
public String message { get; set; }
}
您需要将 PointInTime
个实例添加到您的搜索查询中,如下所示:
esQuery.PointInTime = new PointInTime(PointInTimeId,KeepAlive);
您对 ES 的第一个请求 PointInTimeId
将为空,有关详细信息,请查看 ES 官方文档 here。
我准备了一个带有注释的示例应用程序,演示了如何使用 PIT 从索引中检索所有文档并在之后进行搜索。
class Program
{
static async Task Main(string[] args)
{
string indexName = "test";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
var elasticClient = new ElasticClient(connectionSettings);
await elasticClient.Indices.DeleteAsync(indexName);
var indexResponse = await elasticClient.Indices.CreateAsync(indexName);
// index some test data
// Batch coming from morelinq nuget
Console.WriteLine($"Index some data into index");
foreach (var batches in Enumerable.Range(0, 20000).Batch(1000))
{
var bulk = await elasticClient.IndexManyAsync(batches.Select(x => new EsDocument {Id = x }));
}
await elasticClient.Indices.RefreshAsync();
var countResponse = await elasticClient.CountAsync<EsDocument>(d => d);
Console.WriteLine($"Documents in index: {countResponse.Count}");
Console.WriteLine($"Open new pit");
var openPit = await elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
var pit = openPit.Id;
Console.WriteLine($"Read all docs from index ..");
// we will start reading docs from the beginning
var searchAfter = 0;
try
{
while (true)
{
var searchResponse = await elasticClient.SearchAsync<EsDocument>(s => s
// disable the tracking of total hits to speed up pagination.
.TrackTotalHits(false)
.Size(1000)
// pass pit id and extend lifetime of it by another minute
.PointInTime(pit, d => d.KeepAlive("1m"))
.Query(q => q.MatchAll())
// sort by Id filed so we can pass last retrieved id to next search
.Sort(sort => sort.Ascending(f => f.Id))
// pass last id we received from prev. search request so we can keep retrieving more documents
.SearchAfter(searchAfter));
// if we didn't receive any docs just stop processing
if (searchResponse.Documents.Count == 0)
{
break;
}
Console.WriteLine(
$"Id [{searchResponse.Documents.FirstOrDefault()?.Id}..{searchResponse.Documents.LastOrDefault()?.Id}]");
searchAfter = searchResponse.Documents.LastOrDefault()?.Id ?? 0;
}
}
finally
{
Console.WriteLine($"Close pit");
var closePit = await elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
}
}
class EsDocument
{
public int Id { get; set; }
}
}
版画
Index some data into index
Documents in index: 20000
Open new pit
Read all docs from index ..
Id [1..1000]
Id [1001..2000]
Id [2001..3000]
Id [3001..4000]
Id [4001..5000]
Id [5001..6000]
Id [6001..7000]
Id [7001..8000]
Id [8001..9000]
Id [9001..10000]
Id [10001..11000]
Id [11001..12000]
Id [12001..13000]
Id [13001..14000]
Id [14001..15000]
Id [15001..16000]
Id [16001..17000]
Id [17001..18000]
Id [18001..19000]
Id [19001..19999]
Close pit