EsRejectedExecutionException in elasticsearch 并行搜索

EsRejectedExecutionException in elasticsearch for parallel search

我在我的应用程序中使用单个传输客户端实例向 elasticsearch 查询多个并行请求。

我得到以下并行执行异常。如何克服这个问题。

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction@5f804c60
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509)
    at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441)
    at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171)
    at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52)
    at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42)
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
    at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:107)
    at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63)
    at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:124)
    at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:113)
    at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:212)
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:109)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

也许这听起来很奇怪,但您需要降低并行搜索次数。除了那个例外,Elasticsearch 会告诉您您正在超载它。在 Elasticsearch 中设置了一些限制(在线程数级别),大多数时候,这些限制的默认值是最佳选择。因此,如果您正在测试集群以查看它可以承受多少负载,这将表明已达到某些限制。

或者,如果您真的想更改默认设置,您可以尝试增加 queue size 搜索以适应并发需求,但请记住,队列越大,您施加的压力就越大你的集群最终会导致不稳定。

Elasticsearch 有一个线程池和一个用于每个节点搜索的队列。 线程池将有 N 个工作线程准备好处理请求。当请求到来时,如果 worker 有空,则由 worker 处理。现在默认情况下,worker 的数量等于 CPU 上的核心数量。 当worker满了,搜索请求多了,请求就会去排队。队列的大小也有限制。如果默认大小为 100,并且如果发生比这更多的并行请求,那么这些请求将被拒绝,如您在错误日志中所见。

解决方案:

  1. 解决这个问题的直接方法是增加 搜索队列。我们还可以增加线程池的大小, 但这可能会严重影响个人的表现 查询。因此,增加队列可能是个好主意。但是之后 请记住,这个队列是内存驻留的,并且增加了 队列大小太大会导致内存不足问题。 (更多 信息)

  2. 增加节点和副本的数量——记住每个节点都有自己的 自己搜索 threadpool/queue。此外,搜索可以发生在主要 分片或副本。

我看到了同样的错误,因为我并行向 ES 发送了大量索引请求。由于我正在编写数据迁移,因此很容易使它们串行化,并解决了问题。

我不知道您的节点配置是什么,但您的队列大小 (1000) 已经偏高了。正如其他人已经解释过的,您的搜索请求在 Elasticsearch 线程池队列中排队。即使在如此高的队列大小之后,如果您收到拒绝,这也会提示您需要重新访问您的查询模式。

与许多其他设计一样,即使在这种情况下,也没有 one-size-fits-all 解决方案。我发现 this is a very good post about how this queue works and different ways to do a performance test 找出最适合您的用例的方法。

HTH!