使用 Elasticsearch 的 RestClient 时如何绕过 "connection reset by peer"

How to get around "connection reset by peer" when using Elasticsearch's RestClient

我们正在使用 Hibernate Search 5.10。3.Final 针对 Elasticsearch 5.6.6 服务器。

当直接发出 FullTextQueries 时,我们的应用程序和 ES 之间的连接似乎很牢固,也许 b/c HibernateSearch 有一些内置的重试方法,我不确定,但是,同样在我们的应用程序中,我们使用 Elasticsearch 的RestClient 发出对 _analyze 的直接调用,当我们的防火墙在 30 分钟后关闭空闲连接时,这是我们得到 connection reset by peer IOException 的地方。

java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_131]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_131]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_131]
    at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[?:1.8.0_131]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:1.8.0_131]
    at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:204) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) ~[httpasyncclient-4.1.2.jar:4.1.2]
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) ~[httpasyncclient-4.1.2.jar:4.1.2]
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) ~[httpcore-nio-4.4.5.jar:4.4.5]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]

为了完整起见,这里是我们的大部分 RestClient 代码:

SearchFactory searchFactory = fts.getSearchFactory();
IndexFamily indexFamily = searchFactory.getIndexFamily(ElasticsearchIndexFamilyType.get());
ElasticsearchIndexFamily elasticsearchIndexFamily = indexFamily.unwrap(ElasticsearchIndexFamily.class);
RestClient restClient = elasticsearchIndexFamily.getClient(RestClient.class);

Map<String, String> rawData = new HashMap<>();
rawData.put("analyzer", analyzer);
rawData.put("text", text);

try {
    String jsonData = objectMapper.writeValueAsString(rawData);
    HttpEntity entity = new NStringEntity(jsonData, ContentType.APPLICATION_JSON);

    Response response = restClient.performRequest("GET", "vendor/_analyze", Collections.emptyMap(), entity);

    int statusCode = response.getStatusLine().getStatusCode();
    if (statusCode == HttpStatus.SC_OK) {
        // we parse the response here
    }
} catch (IOException e) {
    String message = "Error communicating with Elasticsearch!";
    logger.error(message, e);
    throw new IllegalStateException(message, e);
}

我们尝试创建一个 'heartbeat',它每分钟使用 RestClient 发出一个小的“_cluster/health”调用,但这似乎也没有完全解决问题。有时甚至心跳也会因相同的 IOException 而失败。

  1. 谁能解释一下 HibernateSearch 和 ES 之间的连接数(我认为它默认为 20 或 2,具体取决于 ES 是否集群)以及这些连接是按循环还是随机顺序使用?
  2. RestClient 的简单重试会再次调用 'wake' 连接吗?
  3. 或者我们是否需要手动重新连接到 ES,如果需要,怎么做?
  4. 最后,是否存在可以解决此问题的现有休眠搜索设置,可能 hibernate.search.default.elasticsearch.discovery.enabled 或其他?

问题说明

我假设您对连接在 30 分钟后被防火墙关闭的解释是正确的。

据我所知,Apache HTTP 客户端根据 ConnectionKeepAliveStrategy 决定将给定连接保持活动状态的时间。默认情况下,这是 org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy,只要 Keep-Alive header 在 Elasticsearch 服务器的响应中推荐,这将保持连接活动,或者如果 Elasticsearch 服务器不 return 这样的 header 回复。

我做了一些测试,显然 Elasticsearch 没有 return 任何 Keep-Alive header,所以目前,连接是 re-used 无限期的,至少直到你的网络杀死他们。

连接中断后,您可能希望自动重试介入,但它们仅在您拥有多个 Elasticsearch 节点时才有效。如果您只有一个节点并且请求失败,那么其余客户端将不会在同一节点上重试。

所以,总而言之,失败是意料之中的。什么不是,事实上您只目睹了您自己的客户端代码的失败,但我想您可能忽略了日志中的一些错误?

解决方案(希望如此)

也许 Apache HTTP 客户端可以在强制关闭时自动处理 re-opening 连接,但我找不到这样的功能。

我也找不到让 Elasticsearch 服务器在其 HTTP 响应中添加 Keep-Alive header 的方法。

如果您使用 HTTP 而不是 HTTPS(在这种情况下我希望它是专用网络),您可以配置您的网络基础设施以在每个 HTTP 消息中插入这样的 header。如果你在代理后面使用 Elasticsearch,比如 Apache 服务器,你应该也可以这样做。

否则,为了在客户端显式配置它,您可以在 Hibernate Search 中使用 org.hibernate.search.elasticsearch.client.spi.ElasticsearchHttpClientConfigurer 扩展点。

警告:这个扩展点是一个 SPI,最重要的是它是实验性的,这意味着它可能会在任何较新版本的 Hibernate Search 中以不兼容的方式发生变化。在下次升级时,您可能必须更改代码,即使是微升级也是如此。我们这边不保证。

创建一个实现:

package com.acme.config;

import org.hibernate.search.elasticsearch.client.spi.ElasticsearchHttpClientConfigurer;

public class MyHttpConfigurer implements ElasticsearchHttpClientConfigurer {
   private static final int KEEP_ALIVE_MS = 20 * 60 * 1000; // 20 minutes
    @Override
    public void configure(HttpAsyncClientBuilder builder, Properties properties) {
        builder.setKeepAliveStrategy( (response, context) -> KEEP_ALIVE_MS );
    }
}

通过创建包含以下内容的 META-INF/services/org.hibernate.search.elasticsearch.client.spi.ElasticsearchHttpClientConfigurer 文件来注册您的实施:

com.acme.config.MyHttpConfigurer

...大功告成。

MyHttpConfigurer 中使用断点在调试模式下启动您的应用程序一次以检查它是否已执行,如果是,HTTP 客户端应在 20 分钟后自动停止使用空闲连接,您应该不会遇到又是同样的问题。

回答您的问题

  1. Can someone explain the number of connections between HibernateSearch and ES (I thought it defaulted to 20 or 2 depending on ES clustered or not) and if the connections are used in round robin or random order?

来自文档:

hibernate.search.default.elasticsearch.max_total_connection 20 (default)

hibernate.search.default.elasticsearch.max_total_connection_per_route 2 (default)

与ES是否集群无关。这取决于客户知道多少 nodes/routes。如果禁用自动发现(hibernate.search.default.elasticsearch.discovery.enabled false,默认值),客户端已知的节点就是您明确配置的节点。如果它被启用,并且集群中有多个节点,那么客户端可能知道比您明确配置的更多节点。

默认情况下,对于客户端已知的每个主机最多使用两个连接,但连接总数永远不会超过 20 个。因此,如果已知 9 个节点,则最多使用 18 个连接;如果已知 10 个节点,则最多使用 20 个连接;如果已知 11 个或更多节点,则最多仍使用 20 个连接。

  1. Will a simple retry of the RestClient call 'wake' the connection up again?

据我所知,它应该,但我不知道究竟是什么重置了你的连接,所以很难说。

  1. Or do we need to manually reconnect the connection to ES and if so, how?

我认为您不应该自己这样做。连接在非常低的级别自动管理。不是通过 Hibernate Search,甚至不是通过 Rest Client,而是通过 HTTP 客户端。

无论如何,如果您真的想那样做,就必须以某种方式掌握 HTTP 客户端。不知道怎么办。

  1. Lastly, is there an existing hibernate search setting that would solve this, possibly hibernate.search.default.elasticsearch.discovery.enabled or another?

hibernate.search.default.elasticsearch.discovery.enabled 仅在您需要更多连接并且您的 Elasticsearch 已集群时才有用;在你的情况下,你现有的连接似乎在一定时间后被终止,所以即使你增加连接数,你仍然会遇到同样的问题。