ElasticSearch 和 Apache HttpAsyncClient
ElasticSearch and Apache HttpAsyncClient
我正在尝试将 ElasticSearch REST API 与 Java Apache HttpAsyncClient
库一起使用。我想使用持久流水线连接。这是一些测试代码(输出在注释中):
@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
testPost(HttpAsyncClients.createDefault());
//201: {"_index":"test_index","_type":"test_type","_id":"AVIHYGnqdqqg_TAHm4ix","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}
testPost(HttpAsyncClients.createPipelining());
//400: No handler found for uri [http://127.0.0.1:9200/test_index/test_type] and method [POST]
}
private void testPost(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
client.start();
HttpPost request = new HttpPost("http://127.0.0.1:9200/test_index/test_type");
request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
Future<HttpResponse> responseFuture = client.execute(request, null);
HttpResponse response = responseFuture.get();
System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}
我不明白,为什么它在 HttpAsyncClients.createDefault()
客户端上工作正常,但在 HttpAsyncClients.createPipelining()
上却不工作。我也无法理解这两种创建方法之间的区别。
为什么我在使用 createPipelining()
时会收到错误响应?
我试图查看 https://httpbin.org/post 的区别,但它向我展示了两种选择的相同结果。我使用默认的 ElasticSearch 设置。
谢谢!
UPD1
我尝试使用 PUT
文档 (PUT http://127.0.0.1/test_index/test_type/<doc id>
) 请求得到相同的结果 - 它在 createDefault()
中工作正常,但在使用 createPipelining()
时我遇到了类似的错误- 未找到处理程序 <...>。
但是,当我尝试执行创建索引 (PUT http://127.0.0.1/<index name>
) 的请求时,出现了另一个错误。请看下面的代码:
@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
testCreateIndex(HttpAsyncClients.createDefault());
//200: {"acknowledged":true}
testCreateIndex(HttpAsyncClients.createPipelining());
//400: {"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"}],"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"},"status":400}
}
private void testCreateIndex(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
client.start();
HttpPut request = new HttpPut("http://127.0.0.1:9200/" + RandomStringUtils.randomAlphabetic(8).toLowerCase());
Future<HttpResponse> responseFuture = client.execute(request, null);
HttpResponse response = responseFuture.get();
System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}
正如我在 this documentation page 看到的那样,ElasticSearch 默认支持 HTTP 管道。也许我需要在 ES 设置中更改什么?
UPD2
以下是 UPD1 部分中具有不同日志设置的代码的一些线路日志:
Dorg.apache.commons.logging.simplelog.log.org.apache.http=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=INFO
-Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.conn=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=DEBUG
UPD3
我刚刚尝试用 createMinimal() 替换 createDefault(),它导致了与 createPipelining() 相同的错误。 MinimalHttpAsyncClient 中的任何想法可能会导致此问题?也许有一种方法可以让我手动创建流水线客户端(使用构建器类)而不会出现这个问题?
服务器必须在请求行中的绝对请求 URI 上阻塞
[DEBUG] wire - http-outgoing-1 >> "PUT http://127.0.0.1:9200/ydiwdsid HTTP/1.1[\r][\n]"
流水线模式下的 HttpAsyncClient 使用最小的协议处理链。它不会尝试重写请求对象的请求 URI。
对于您的特定情况,请求流水线似乎没有多大意义。更不用说除非您批量提交请求,否则您甚至不会使用流水线执行。
实际上,您只需要从 URL 中提取主机并仅使用绝对路径创建一个 HttpPost
对象。请参阅下面第二行、第三行和第五行的更改:
client.start();
HttpHost targetHost = new HttpHost("127.0.0.1", 9200);
HttpPost request = new HttpPost("/test_index/test_type");
request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
Future<HttpResponse> responseFuture = client.execute(targetHost, request, null);
HttpResponse response = responseFuture.get();
System.out.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
进行这三项更改并再次 运行 代码将产生以下结果:
201: {"_index":"test_index","_type":"test_type","_id":"AVISSimIZHOoPG8ibOyF","_version":1,"created":true}
201: {"_index":"test_index","_type":"test_type","_id":"AVISSimjZHOoPG8ibOyG","_version":1,"created":true}
我正在尝试将 ElasticSearch REST API 与 Java Apache HttpAsyncClient
库一起使用。我想使用持久流水线连接。这是一些测试代码(输出在注释中):
@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
testPost(HttpAsyncClients.createDefault());
//201: {"_index":"test_index","_type":"test_type","_id":"AVIHYGnqdqqg_TAHm4ix","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}
testPost(HttpAsyncClients.createPipelining());
//400: No handler found for uri [http://127.0.0.1:9200/test_index/test_type] and method [POST]
}
private void testPost(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
client.start();
HttpPost request = new HttpPost("http://127.0.0.1:9200/test_index/test_type");
request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
Future<HttpResponse> responseFuture = client.execute(request, null);
HttpResponse response = responseFuture.get();
System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}
我不明白,为什么它在 HttpAsyncClients.createDefault()
客户端上工作正常,但在 HttpAsyncClients.createPipelining()
上却不工作。我也无法理解这两种创建方法之间的区别。
为什么我在使用 createPipelining()
时会收到错误响应?
我试图查看 https://httpbin.org/post 的区别,但它向我展示了两种选择的相同结果。我使用默认的 ElasticSearch 设置。
谢谢!
UPD1
我尝试使用 PUT
文档 (PUT http://127.0.0.1/test_index/test_type/<doc id>
) 请求得到相同的结果 - 它在 createDefault()
中工作正常,但在使用 createPipelining()
时我遇到了类似的错误- 未找到处理程序 <...>。
但是,当我尝试执行创建索引 (PUT http://127.0.0.1/<index name>
) 的请求时,出现了另一个错误。请看下面的代码:
@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
testCreateIndex(HttpAsyncClients.createDefault());
//200: {"acknowledged":true}
testCreateIndex(HttpAsyncClients.createPipelining());
//400: {"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"}],"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"},"status":400}
}
private void testCreateIndex(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
client.start();
HttpPut request = new HttpPut("http://127.0.0.1:9200/" + RandomStringUtils.randomAlphabetic(8).toLowerCase());
Future<HttpResponse> responseFuture = client.execute(request, null);
HttpResponse response = responseFuture.get();
System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}
正如我在 this documentation page 看到的那样,ElasticSearch 默认支持 HTTP 管道。也许我需要在 ES 设置中更改什么?
UPD2
以下是 UPD1 部分中具有不同日志设置的代码的一些线路日志:
Dorg.apache.commons.logging.simplelog.log.org.apache.http=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=INFO
-Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.conn=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=DEBUG
UPD3
我刚刚尝试用 createMinimal() 替换 createDefault(),它导致了与 createPipelining() 相同的错误。 MinimalHttpAsyncClient 中的任何想法可能会导致此问题?也许有一种方法可以让我手动创建流水线客户端(使用构建器类)而不会出现这个问题?
服务器必须在请求行中的绝对请求 URI 上阻塞
[DEBUG] wire - http-outgoing-1 >> "PUT http://127.0.0.1:9200/ydiwdsid HTTP/1.1[\r][\n]"
流水线模式下的 HttpAsyncClient 使用最小的协议处理链。它不会尝试重写请求对象的请求 URI。
对于您的特定情况,请求流水线似乎没有多大意义。更不用说除非您批量提交请求,否则您甚至不会使用流水线执行。
实际上,您只需要从 URL 中提取主机并仅使用绝对路径创建一个 HttpPost
对象。请参阅下面第二行、第三行和第五行的更改:
client.start();
HttpHost targetHost = new HttpHost("127.0.0.1", 9200);
HttpPost request = new HttpPost("/test_index/test_type");
request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
Future<HttpResponse> responseFuture = client.execute(targetHost, request, null);
HttpResponse response = responseFuture.get();
System.out.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
进行这三项更改并再次 运行 代码将产生以下结果:
201: {"_index":"test_index","_type":"test_type","_id":"AVISSimIZHOoPG8ibOyF","_version":1,"created":true}
201: {"_index":"test_index","_type":"test_type","_id":"AVISSimjZHOoPG8ibOyG","_version":1,"created":true}