使用 NEST returns es_rejected_execution_exception 的 Elasticsearch 批量插入
Elasticsearch bulk insert with NEST returns es_rejected_execution_exception
我正在尝试在 Elasticsearch
中使用 .Net API
进行批量插入,这是我在执行操作时遇到的错误;
Error {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""} Nest.BulkError
是因为我的系统space低还是批量插入功能本身不起作用?我的NEST
版本是5.0
,Elasticsearch
版本也是5.0
。
批量插入逻辑代码;
public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
BulkDescriptor descriptor = new BulkDescriptor();
foreach (var j in Enumerable.Range(0, recordList.Count)) {
descriptor.Index<BaseData>(op => op.Document(recordList[j])
.Index(listOfIndexName[j]));
}
var result = clientConnection.Bulk(descriptor);
}
正如 Val 在评论中所说的那样,您一次发送的数据可能会超出集群的处理能力。看起来您可能试图在 one 批量请求中发送 all 您的文档,这对于大量文档或大型文档可能不起作用。
使用 _bulk
,您需要在多个批量请求中将数据发送到集群,并找到您可以在每个批量请求中发送的 最佳 文档数,除了您可以同时发送到集群的批量请求的数量。
这里没有关于最佳大小的硬性规定,因为它会因文档的复杂性、分析方式、集群硬件、集群设置、索引设置等而异。
最好的办法是从一个合理的数量开始,比如在一个请求中包含 500 个文档(或在您的上下文中有意义的某个数量),然后从那里开始。计算每个批量请求的总大小(以字节为单位)也是一种不错的方法。如果性能和吞吐量不足,则增加文档数量、请求字节大小或并发请求,直到您开始看到 es_rejected_execution_exception
.
NEST 5.x 附带一个方便的助手,使用 IObservable<T>
和 Observable 设计模式
使批量请求变得更加容易
void Main()
{
var client = new ElasticClient();
// can cancel the operation by calling .Cancel() on this
var cancellationTokenSource = new CancellationTokenSource();
// set up the bulk all observable
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
// number of concurrent requests
.MaxDegreeOfParallelism(8)
// in case of 429 response, how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(5))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(2)
// number of documents to send in each request
.Size(500)
.Index("index-name")
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
var waitHandle = new ManualResetEvent(false);
Exception ex = null;
// what to do on each call, when an exception is thrown, and
// when the bulk all completes
var bulkAllObserver = new BulkAllObserver(
onNext: bulkAllResponse =>
{
// do something after each bulk request
},
onError: exception =>
{
// do something with exception thrown
ex = exception;
waitHandle.Set();
},
onCompleted: () =>
{
// do something when all bulk operations complete
waitHandle.Set();
});
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for handle to be set.
waitHandle.WaitOne();
if (ex != null)
{
throw ex;
}
}
// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
return Enumerable.Range(1, 10000).Select(x =>
new Document
{
Id = x,
Name = $"Document {x}"
}
);
}
public class Document
{
public int Id { get; set; }
public string Name { get; set; }
}
我正在尝试在 Elasticsearch
中使用 .Net API
进行批量插入,这是我在执行操作时遇到的错误;
Error {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""} Nest.BulkError
是因为我的系统space低还是批量插入功能本身不起作用?我的NEST
版本是5.0
,Elasticsearch
版本也是5.0
。
批量插入逻辑代码;
public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
BulkDescriptor descriptor = new BulkDescriptor();
foreach (var j in Enumerable.Range(0, recordList.Count)) {
descriptor.Index<BaseData>(op => op.Document(recordList[j])
.Index(listOfIndexName[j]));
}
var result = clientConnection.Bulk(descriptor);
}
正如 Val 在评论中所说的那样,您一次发送的数据可能会超出集群的处理能力。看起来您可能试图在 one 批量请求中发送 all 您的文档,这对于大量文档或大型文档可能不起作用。
使用 _bulk
,您需要在多个批量请求中将数据发送到集群,并找到您可以在每个批量请求中发送的 最佳 文档数,除了您可以同时发送到集群的批量请求的数量。
这里没有关于最佳大小的硬性规定,因为它会因文档的复杂性、分析方式、集群硬件、集群设置、索引设置等而异。
最好的办法是从一个合理的数量开始,比如在一个请求中包含 500 个文档(或在您的上下文中有意义的某个数量),然后从那里开始。计算每个批量请求的总大小(以字节为单位)也是一种不错的方法。如果性能和吞吐量不足,则增加文档数量、请求字节大小或并发请求,直到您开始看到 es_rejected_execution_exception
.
NEST 5.x 附带一个方便的助手,使用 IObservable<T>
和 Observable 设计模式
void Main()
{
var client = new ElasticClient();
// can cancel the operation by calling .Cancel() on this
var cancellationTokenSource = new CancellationTokenSource();
// set up the bulk all observable
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
// number of concurrent requests
.MaxDegreeOfParallelism(8)
// in case of 429 response, how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(5))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(2)
// number of documents to send in each request
.Size(500)
.Index("index-name")
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
var waitHandle = new ManualResetEvent(false);
Exception ex = null;
// what to do on each call, when an exception is thrown, and
// when the bulk all completes
var bulkAllObserver = new BulkAllObserver(
onNext: bulkAllResponse =>
{
// do something after each bulk request
},
onError: exception =>
{
// do something with exception thrown
ex = exception;
waitHandle.Set();
},
onCompleted: () =>
{
// do something when all bulk operations complete
waitHandle.Set();
});
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for handle to be set.
waitHandle.WaitOne();
if (ex != null)
{
throw ex;
}
}
// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
return Enumerable.Range(1, 10000).Select(x =>
new Document
{
Id = x,
Name = $"Document {x}"
}
);
}
public class Document
{
public int Id { get; set; }
public string Name { get; set; }
}