Spring 引导 + 弹性搜索:Java RestHighLevelClient 拒绝连接
Spring Boot + Elastic Search : Connection Refused with Java RestHighLevelClient
我正在为我的 spring 引导项目实现一个弹性池,我也在使用 spring 引导 2.1.4 和弹性搜索 7.3.0。我被困在这个。当任何 API 尝试查询时,它会给出 java.net.ConnectException: Connection refused
。我想使用带有设置线程数的 customizeHttpClient
配置。因此,当应用程序启动时,它只建立一个连接,并且仅使用该连接查询数据库,直到 bean 销毁。
我试过这个弹性配置:
import java.io.IOException;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
public class ElasticConfig{
public static String host;
private static String port;
private static String protocol;
private static String username;
private static String password;
private RestHighLevelClient client;
@Value("${dselastic.host}")
public void setHost(String value) {
host = value;
}
@Value("${dselastic.port}")
public void setPort(String value) {
port = value;
}
@Value("${dselastic.protocol}")
public void setProtocol(String value) {
protocol = value;
}
@Value("${dselastic.username}")
public void setUsername(String value) {
username = value;
}
@Value("${dselastic.password}")
public void setPassword(String value) {
password = value;
}
@Bean(destroyMethod = "cleanUp")
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
public void prepareConnection() {
RestClientBuilder restBuilder = RestClient.builder(new HttpHost(host, Integer.valueOf(port), protocol));
if (username != null & password != null) {
final CredentialsProvider creadential = new BasicCredentialsProvider();
creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(creadential)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
}
});
restBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
.setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
.setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.
client = new RestHighLevelClient(restBuilder);
}
}
public void cleanUp() {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
我也尝试过实现 DisposableBean
接口,它是 destroy
方法,但我遇到了同样的异常。
这是我的 API 我正在尝试查询文档的地方:
public class IndexNameController {
@Autowired
RestHighLevelClient client;
@GetMapping(value = "/listAllNames")
public ArrayList<Object> listAllNames(HttpSession session) {
ArrayList<Object> results = new ArrayList<>();
try {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchRequest.indices("indexname");
String[] fields = { "name", "id" };
searchSourceBuilder.fetchSource(fields, new String[] {});
searchSourceBuilder.query(QueryBuilders.matchAllQuery()).size(10000);
searchSourceBuilder = searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.DESC));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
Map<String, Object> map = new HashMap<>();
map.put("value", searchHit.getSourceAsMap().get("id"));
map.put("name", searchHit.getSourceAsMap().get("name"));
results.add(map);
}
return results;
} catch (Exception e) {
e.printStackTrace();
}
return new ArrayList<>();
}
}
当它试图查询时,它在 client.search()
给出了异常。这是 堆栈跟踪:
java.net.ConnectException: Connection refused
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:788)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:218)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:205)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1454)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:930)
at com.incident.response.controller.IncidentController.listAllIncidents(IncidentController.java:569)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:171)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:145)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.run(CloseableHttpAsyncClientBase.java:64)
... 1 more
请帮我摆脱这个。所有帮助和建议都将适用。
- 我研究了很多关于如何摆脱这个问题的方法,之后我尝试了很多解决方案,最后我会得到解决方案,所有事情都按我想要的方式工作。
- 我正在尝试实现一个客户端使用的弹性池
任何查询或聚合的整个项目以及 bean 何时销毁
将关闭,我的整个查询仅由一个人完成
连接。
- 我改变了我的弹性配置,像这样:
@Configuration
public class ElasticConfig {
@Autowired
Environment environment;
private RestHighLevelClient client;
@Bean
public RestHighLevelClient prepareConnection() {
RestClientBuilder restBuilder = RestClient
.builder(new HttpHost(environment.getProperty("zselastic.host").toString(),
Integer.valueOf(environment.getProperty("zselastic.port").toString()),
environment.getProperty("zselastic.protocol").toString()));
String username = new String(environment.getProperty("zselastic.username").toString());
String password = new String(environment.getProperty("zselastic.password").toString());
if (username != null & password != null) {
final CredentialsProvider creadential = new BasicCredentialsProvider();
creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(creadential)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
}
});
restBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
.setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
.setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.
client = new RestHighLevelClient(restBuilder);
return client;
}
return null;
}
/*
* it gets called when bean instance is getting removed from the context if
* scope is not a prototype
*/
/*
* If there is a method named shutdown or close then spring container will try
* to automatically configure them as callback methods when bean is being
* destroyed
*/
@PreDestroy
public void clientClose() {
try {
this.client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 所以现在我的 bean return
RestHighLevelClient
它将被使用
整个项目,我得到了每个 API 的回复,而不是
java.net.ConnectException: Connection refused
.
- 此外,我使用这个检查节点统计信息
http://host:port/_nodes/stats/http
。当我的 Spring 启动时
应用程序启动它会启动一个连接到
elasticsearch 和一个条目将添加到 current_open
。后
也就是说,在我的应用程序全部 运行 之前,连接不会增加
查询和聚合是通过使用这个来执行的
仅连接。我的应用程序何时关闭或停止连接
将关闭并从 current_open
. 中删除条目
- 所以现在我可以得出结论,我通过使用此配置应用了弹性池。
在我的例子中,我忘记在创建 Bean 的方法中添加 @Bean
注释。
遵循 AbstractElasticsearchConfiguration
.
的实施 class
package org.lauksas.elasticsearch.configuration;
import java.time.Duration;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.core.ElasticsearchEntityMapper;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.http.HttpHeaders;
@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private int port;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
Logger log = LoggerFactory.getLogger(getClass());
@Bean
@Override
public EntityMapper entityMapper() {
ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(),
new DefaultConversionService());
entityMapper.setConversions(elasticsearchCustomConversions());
return entityMapper;
}
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
HttpHeaders headers = new HttpHeaders();
headers.setBasicAuth(username, password);
final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
.usingSsl().withBasicAuth(username, password).withSocketTimeout(Duration.ofMinutes(10))
.build();
return RestClients.create(clientConfiguration).rest();
}
@Bean
public RestClient restClient() {
HttpHeaders headers = new HttpHeaders();
headers.setBasicAuth(username, password);
final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
.usingSsl().withBasicAuth(username, password).build();
return RestClients.create(clientConfiguration).lowLevelRest();
}
@Bean
@Primary
public ElasticsearchOperations elasticsearchTemplate() {
return elasticsearchOperations();
}
}
我正在为我的 spring 引导项目实现一个弹性池,我也在使用 spring 引导 2.1.4 和弹性搜索 7.3.0。我被困在这个。当任何 API 尝试查询时,它会给出 java.net.ConnectException: Connection refused
。我想使用带有设置线程数的 customizeHttpClient
配置。因此,当应用程序启动时,它只建立一个连接,并且仅使用该连接查询数据库,直到 bean 销毁。
我试过这个弹性配置:
import java.io.IOException;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
public class ElasticConfig{
public static String host;
private static String port;
private static String protocol;
private static String username;
private static String password;
private RestHighLevelClient client;
@Value("${dselastic.host}")
public void setHost(String value) {
host = value;
}
@Value("${dselastic.port}")
public void setPort(String value) {
port = value;
}
@Value("${dselastic.protocol}")
public void setProtocol(String value) {
protocol = value;
}
@Value("${dselastic.username}")
public void setUsername(String value) {
username = value;
}
@Value("${dselastic.password}")
public void setPassword(String value) {
password = value;
}
@Bean(destroyMethod = "cleanUp")
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
public void prepareConnection() {
RestClientBuilder restBuilder = RestClient.builder(new HttpHost(host, Integer.valueOf(port), protocol));
if (username != null & password != null) {
final CredentialsProvider creadential = new BasicCredentialsProvider();
creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(creadential)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
}
});
restBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
.setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
.setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.
client = new RestHighLevelClient(restBuilder);
}
}
public void cleanUp() {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
我也尝试过实现 DisposableBean
接口,它是 destroy
方法,但我遇到了同样的异常。
这是我的 API 我正在尝试查询文档的地方:
public class IndexNameController {
@Autowired
RestHighLevelClient client;
@GetMapping(value = "/listAllNames")
public ArrayList<Object> listAllNames(HttpSession session) {
ArrayList<Object> results = new ArrayList<>();
try {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchRequest.indices("indexname");
String[] fields = { "name", "id" };
searchSourceBuilder.fetchSource(fields, new String[] {});
searchSourceBuilder.query(QueryBuilders.matchAllQuery()).size(10000);
searchSourceBuilder = searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.DESC));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
Map<String, Object> map = new HashMap<>();
map.put("value", searchHit.getSourceAsMap().get("id"));
map.put("name", searchHit.getSourceAsMap().get("name"));
results.add(map);
}
return results;
} catch (Exception e) {
e.printStackTrace();
}
return new ArrayList<>();
}
}
当它试图查询时,它在 client.search()
给出了异常。这是 堆栈跟踪:
java.net.ConnectException: Connection refused
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:788)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:218)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:205)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1454)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:930)
at com.incident.response.controller.IncidentController.listAllIncidents(IncidentController.java:569)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:171)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:145)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.run(CloseableHttpAsyncClientBase.java:64)
... 1 more
请帮我摆脱这个。所有帮助和建议都将适用。
- 我研究了很多关于如何摆脱这个问题的方法,之后我尝试了很多解决方案,最后我会得到解决方案,所有事情都按我想要的方式工作。
- 我正在尝试实现一个客户端使用的弹性池 任何查询或聚合的整个项目以及 bean 何时销毁 将关闭,我的整个查询仅由一个人完成 连接。
- 我改变了我的弹性配置,像这样:
@Configuration
public class ElasticConfig {
@Autowired
Environment environment;
private RestHighLevelClient client;
@Bean
public RestHighLevelClient prepareConnection() {
RestClientBuilder restBuilder = RestClient
.builder(new HttpHost(environment.getProperty("zselastic.host").toString(),
Integer.valueOf(environment.getProperty("zselastic.port").toString()),
environment.getProperty("zselastic.protocol").toString()));
String username = new String(environment.getProperty("zselastic.username").toString());
String password = new String(environment.getProperty("zselastic.password").toString());
if (username != null & password != null) {
final CredentialsProvider creadential = new BasicCredentialsProvider();
creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(creadential)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
}
});
restBuilder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
.setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
.setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.
client = new RestHighLevelClient(restBuilder);
return client;
}
return null;
}
/*
* it gets called when bean instance is getting removed from the context if
* scope is not a prototype
*/
/*
* If there is a method named shutdown or close then spring container will try
* to automatically configure them as callback methods when bean is being
* destroyed
*/
@PreDestroy
public void clientClose() {
try {
this.client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 所以现在我的 bean return
RestHighLevelClient
它将被使用 整个项目,我得到了每个 API 的回复,而不是java.net.ConnectException: Connection refused
. - 此外,我使用这个检查节点统计信息
http://host:port/_nodes/stats/http
。当我的 Spring 启动时 应用程序启动它会启动一个连接到 elasticsearch 和一个条目将添加到current_open
。后 也就是说,在我的应用程序全部 运行 之前,连接不会增加 查询和聚合是通过使用这个来执行的 仅连接。我的应用程序何时关闭或停止连接 将关闭并从current_open
. 中删除条目
- 所以现在我可以得出结论,我通过使用此配置应用了弹性池。
在我的例子中,我忘记在创建 Bean 的方法中添加 @Bean
注释。
遵循 AbstractElasticsearchConfiguration
.
package org.lauksas.elasticsearch.configuration;
import java.time.Duration;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.core.ElasticsearchEntityMapper;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.http.HttpHeaders;
@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private int port;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
Logger log = LoggerFactory.getLogger(getClass());
@Bean
@Override
public EntityMapper entityMapper() {
ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(),
new DefaultConversionService());
entityMapper.setConversions(elasticsearchCustomConversions());
return entityMapper;
}
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
HttpHeaders headers = new HttpHeaders();
headers.setBasicAuth(username, password);
final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
.usingSsl().withBasicAuth(username, password).withSocketTimeout(Duration.ofMinutes(10))
.build();
return RestClients.create(clientConfiguration).rest();
}
@Bean
public RestClient restClient() {
HttpHeaders headers = new HttpHeaders();
headers.setBasicAuth(username, password);
final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
.usingSsl().withBasicAuth(username, password).build();
return RestClients.create(clientConfiguration).lowLevelRest();
}
@Bean
@Primary
public ElasticsearchOperations elasticsearchTemplate() {
return elasticsearchOperations();
}
}