在使用 Java 高级其余客户端批量 API 创建动态 elasticsearch 索引方面需要帮助

Need help in creating dynamic elasticsearch index using Java high level rest client bulk API

我正在创建一个 Elasticsearch 集群,它将与我们的 Java 代码库集成。我想创建一个 Elasticsearch 索引并从多个数据库中向其中插入 SQL 查询数据。来自所有数据库的查询结果应该被插入到同一个索引中。为此,我正在使用 Java 高级 Rest 客户端。 但我不太确定如何执行此操作,因为旧 API 中的许多方法已被弃用。 我也不太确定如何处理 createIndexResponse 实例。 谁能帮我解决这个问题?

        public static void method_1(Connection con) throws Exception {

            Statement statement = con.createStatement();
            try {
                ResultSet result = statement.executeQuery("SELECT Field_1, Field_2, Field_3 from Table_1");
                int counter = 1;
                CreateIndexRequest createIndexRequest = new CreateIndexRequest("index_name");
                createIndexRequest.settings(new Settings.Builder()
                        .put("cluster.name", "my_cluster")
                        .put("http.enabled", true)
                        .put("node.data", true)
                        .put("index.number_of_shards", 3)
                        .put("index.number_of_replicas", 1)
                        .build());
                CreateIndexResponse createIndexResponse = ElasticSearch.eclient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
                BulkRequest bulkRequest = new BulkRequest();
                while (result.next()) {
                    String field_1 = result.getString("Field_1");
                    int field_2 = result.getInt("Field_2");
                    String field_3 = result.getString("Field_3");
                    XContentBuilder builder = XContentFactory.jsonBuilder()
                            .startObject()
                            .field("Field 1", field_1)
                            .field("Field 2", field_2)
                            .field("Field 3", field_3)
                            .endObject();
                    UpdateRequest updateRequest = new UpdateRequest("index_name", "_doc", Integer.toString(counter));
                    updateRequest.doc(builder);
                    bulkRequest.add(updateRequest);
                }
                BulkResponse response = ElasticSearch.eclient.bulk(bulkRequest, RequestOptions.DEFAULT);
                if (response.hasFailures()) {
                    for (BulkItemResponse item : response.getItems()) {
                        System.out.println(item.getFailureMessage());
                    }
                }
                counter++;
                statement.close();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }

您没有给出您使用的是哪个版本。无论如何,我正在考虑您使用的是 7.0.0.

这是一个例子,我猜应该没问题:

try (RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(HttpHost.create("http://localhost:9200")))) {
    CreateIndexRequest createIndexRequest = new CreateIndexRequest("index_name");
    createIndexRequest.settings(Settings.builder()
            .put("index.number_of_shards", 3)
            .put("index.number_of_replicas", 1)
            .build());
    client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    BulkRequest bulkRequest = new BulkRequest();
    XContentBuilder builder = XContentFactory.jsonBuilder()
            .startObject()
                .field("foo", "bar")
            .endObject();
    IndexRequest indexRequest = new IndexRequest("index_name");
    indexRequest.source(builder);
    bulkRequest.add(indexRequest);
    BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    if (response.hasFailures()) {
        for (BulkItemResponse item : response.getItems()) {
            System.out.println(item.getFailureMessage());
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

我建议使用 BulkProcessor class,它更容易处理 IMO。我有 a full demo repository,我会在每个新版本上更新它。它也可能对您有所帮助。