向 Elasticsearch 添加新的 HTTP 客户端以支持客户端应用程序 运行 针对 AWS Elasticsearch?

Adding a new HTTP client to Elasticsearch to support client apps to run against AWS Elasticsearch?

我正在尝试将 Elasticsearch HTTP 访问添加到 Titan ES client using JEST. titan-es only supports ES' local and transport (TCP) mode. But I would like to support communication over ES' HTTP interface. That would allow client libraries like titan-es to use AWS Elasticsearch as an indexing backend which only provides a HTTP(S) interface. See this post 以获取更多信息。

我正在寻找一些关于我目前正在考虑的方法的反馈:

  1. 创建一个实现 org.elasticache.client.Client 接口的新 class ElasticsearchHttpClient。新的 class 将使用 JestClient 作为其内部客户端。这样它将通过 HTTP 与 ES 通信。新的 class 可能会扩展 ES 的 AbstractClient 以减少必须实现的方法:admin()settings()execute()threadPool(), 和 close().
  2. 将新枚举 HTTP_CLIENT 添加到 ElasticSearchSetup
  3. 确保 HTTP_CLIENT returns 上的 connect() 方法是 Connection 的实例,其中包含 nodeclient 的正确值。 client 成员将是新 ElasticsearchHttpClient class.
  4. 的一个实例
  5. 如果 INTERFACE 配置为 HTTP_CLIENT,请确保 ElasticSearchIndex.interfaceConfiguration() 方法检索 Connection 的正确实例(包含新的 ElasticsearchHttpClient)。从那时起,其余代码应继续在新协议上工作。

这听起来行得通吗?第一步是我最关心的问题——我不确定我是否可以使用 JestClient 实现所有客户端方法。

package com.thinkaurelius.titan.diskstorage.es;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;

public class ElasticsearchHttpClient extends AbstractClient {
    private final JestClient internalClient;
    private final ThreadPool pool;

    public ElasticsearchHttpClient(String hostname, int port) {
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new HttpClientConfig
                .Builder(String.format("http://%s:%d", hostname, port))
                .multiThreaded(true)
                .build());
        JestClient client = factory.getObject();

        this.pool = new ThreadPool("jest");
        this.internalClient = client;
    }

    @Override
    public AdminClient admin() {
        return null;
    }

    @Override
    public Settings settings() {
        return null;
    }

    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
        try {
            JestResult response = internalClient.execute(convertRequest(action, request));
            return convertResponse(response);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        execute(action, request);
    }

    private <Response extends ActionResponse> ActionFuture<Response> convertResponse(JestResult result) {
        // TODO How to convert a JestResult a Elasticsearch ActionResponse/ActionFuture?
        return null;
    }

    private <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> io.searchbox.action.Action<JestResult> convertRequest(Action<Request, Response, RequestBuilder, Client> action, Request request) {
        // TODO How to convert an Elasticsearch Action<..> and Request to a Jest Action<JestResult>?
        return null;
    }

    @Override
    public ThreadPool threadPool() {
        return pool;
    }

    @Override
    public void close() throws ElasticsearchException {
        pool.shutdownNow();
    }
}

[我也在 Titan mailing list and Elasticsearch forum 上问过这个问题。]

我已经在 Titan mailing list 中发布了一个答案。

What you'd need to do from a Titan perspective is implement the IndexProvider interface. My guess is that it isn't feasible to make Jest look like a full Elasticsearch client.

I think you would use JestHttpClient -- you don't need to implement the Jest interface. IndexProvider has methods to create/drop/mutate/query an index, which you should be able to do over HTTP. Check the Elasticsearch HTTP documentation to see if you can do all the required methods on IndexProvider with JestHttpClient.

There's already an ElasticSearchIndex implementation of IndexProvider, which does NODE and TRANSPORT. You're trying to add an HTTP or JEST option. So you might consider shoehorning your changes into ElasticSearchIndex, but I'm not sure how well that will work out since the 2 existing impls are both full Elasticsearch clients. Perhaps consider creating a separate ElasticSearchHttpIndex implements IndexProvider if it's cleaner.