使用 NEST 获取 ElasticSearch 批量队列状态
Get ElasticSearch bulk queue status with NEST
我有一个程序可以在 ElasticSearch 集群上执行多个批量索引操作。在某些时候,我开始收到类似这样的错误(截断):
RemoteTransportException[...][indices:data/write/bulk[s]]]; nested: EsRejectedExecutionException[rejected execution (queue capacity 100) ...];
有什么方法可以验证批量上传队列的状态,最好是使用 NEST,这样我就可以在发现服务器上的队列已满时减慢客户端应用程序的速度?
NodesInfo
方法看起来很有趣,但我不知道如何访问我需要的信息:
using Nest;
using System;
class Program {
static void Main(string[] args) {
ElasticClient client = new ElasticClient(new ConnectionSettings(new Uri("http://whatever:9200/")));
var nodesInfoResponse = client.NodesInfo();
if (nodesInfoResponse.IsValid) {
foreach (var n in nodesInfoResponse.Nodes) {
Console.WriteLine($"Node: {n.Key}");
var bulk = n.Value.ThreadPool["bulk"];
// ???
}
}
}
}
您需要使用 NodesStats()
而不是 NodesInfo()
。
var nodesStatsResponse = client.NodesStats();
if (nodesStatsResponse.IsValid)
{
foreach (var node in nodesStatsResponse.Nodes)
{
long bulkThreadPoolQueueSize = node.Value.ThreadPool["bulk"].Queue;
}
}
更新:
上面的查询会带来比要求更多的信息。获取相同信息的高度优化请求是通过使用 _cat/thread_pool
API。见下文:
var catThreadPoolResponse = client.CatThreadPool(d => d.H("host", "bulk.queue"));
if (catThreadPoolResponse.IsValid)
{
foreach (var record in catThreadPoolResponse.Records)
{
string nodeName = record.Host;
long bulkThreadPoolQueueSize = int.Parse(record.Bulk.Queue);
Console.WriteLine($"Node [{nodeName}] : BulkThreadPoolQueueSize [{bulkThreadPoolQueueSize}]");
}
}
我有一个程序可以在 ElasticSearch 集群上执行多个批量索引操作。在某些时候,我开始收到类似这样的错误(截断):
RemoteTransportException[...][indices:data/write/bulk[s]]]; nested: EsRejectedExecutionException[rejected execution (queue capacity 100) ...];
有什么方法可以验证批量上传队列的状态,最好是使用 NEST,这样我就可以在发现服务器上的队列已满时减慢客户端应用程序的速度?
NodesInfo
方法看起来很有趣,但我不知道如何访问我需要的信息:
using Nest;
using System;
class Program {
static void Main(string[] args) {
ElasticClient client = new ElasticClient(new ConnectionSettings(new Uri("http://whatever:9200/")));
var nodesInfoResponse = client.NodesInfo();
if (nodesInfoResponse.IsValid) {
foreach (var n in nodesInfoResponse.Nodes) {
Console.WriteLine($"Node: {n.Key}");
var bulk = n.Value.ThreadPool["bulk"];
// ???
}
}
}
}
您需要使用 NodesStats()
而不是 NodesInfo()
。
var nodesStatsResponse = client.NodesStats();
if (nodesStatsResponse.IsValid)
{
foreach (var node in nodesStatsResponse.Nodes)
{
long bulkThreadPoolQueueSize = node.Value.ThreadPool["bulk"].Queue;
}
}
更新:
上面的查询会带来比要求更多的信息。获取相同信息的高度优化请求是通过使用 _cat/thread_pool
API。见下文:
var catThreadPoolResponse = client.CatThreadPool(d => d.H("host", "bulk.queue"));
if (catThreadPoolResponse.IsValid)
{
foreach (var record in catThreadPoolResponse.Records)
{
string nodeName = record.Host;
long bulkThreadPoolQueueSize = int.Parse(record.Bulk.Queue);
Console.WriteLine($"Node [{nodeName}] : BulkThreadPoolQueueSize [{bulkThreadPoolQueueSize}]");
}
}