如何使用 Java 将 CSV 文件索引到 Elasticsearch?

How to Index our CSV file into Elastic Search using Java?

有没有什么方法可以使用 Java 将我们的 CSV 文件索引到 Elastic Search 中,我之前是使用 logstash 来做的,但现在我需要在 Java 和 运行 中对其进行编码动态.. 我尝试使用 Index API 但它不适合我的情况 有人可以帮我吗.. 我的 CSV 数据看起来像这样这只是一个样本我有这个对象..

样本 CSV 数据是这样的..

 id  profile_id  hier_name       attri_name     item
  1   1          CUSTOMER        CUSTOMER        C001
  2   1          CUSTOMER        CUSTOMER        C002
  3   1          CUSTOMER        CUSTOMER        C003

这是我尝试进行批量插入的方法,但它似乎不适用于我当前版本的 Elastic Search 7.12.0,

     package com.javadeveloperzone;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

public class ESBulkIndexingExample {

    String indexName, indexTypeName;
    TransportClient client = null;

    public static void main(String[] args) {
        ESBulkIndexingExample esExample = new ESBulkIndexingExample();
        try {
            esExample.initEStransportClinet();
            System.out.println("init done");
            esExample.CSVbulkImport(true);
            System.out.println("bulkimport done");
            esExample.refreshIndices();

            esExample.search();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            esExample.closeTransportClient(); // close transport client
        }
    }

    public ESBulkIndexingExample() {
        indexName = "document";
        indexTypeName = "bulkindexing";
    }

    public boolean initEStransportClinet() {
        try {
            client = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

            return true;
        } catch (Exception ex) {
            ex.printStackTrace();
            return false;
        }
    }

    public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException {

        BulkRequestBuilder bulkRequest = client.prepareBulk();

        File file = new File(
                "/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv");
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

        String line = null;
        int count = 0, noOfBatch = 1;
        if (bufferedReader != null && isHeaderIncluded) {
            bufferedReader.readLine();
        }
        while ((line = bufferedReader.readLine()) != null) {

            if (line.trim().length() == 0) {
                continue;
            }
            String data[] = line.split(",");
            if (data.length == 3) {

                try {
                    XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[0])
                            .field("hierarchy_name", data[1]).field("attribute_name", data[2]).field("item_pk", data[3])
                            .endObject();

                    BulkRequestBuilder add = bulkRequest
                            .add(client.prepareIndex(indexName, indexTypeName, data[0]).setSource(xContentBuilder));

                    System.out.println(add);
                    if ((count + 1) % 500 == 0) {
                        count = 0;
                        addDocumentToESCluser(bulkRequest, noOfBatch, count);
                        noOfBatch++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                System.out.println("Invalid data : " + line);
            }
            count++;
        }
        bufferedReader.close();
        addDocumentToESCluser(bulkRequest, noOfBatch, count);

    }

    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {

        if (count == 0) {
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {
            System.out.println("Bulk Indexing failed for Batch : " + noOfBatch);

            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
            while (iterator.hasNext()) {
                BulkItemResponse response = iterator.next();
                if (response.isFailed()) {
                    numberOfDocFailed++;
                }
            }
            System.out.println("Out of " + count + " documents, " + numberOfDocFailed + " documents failed");
            System.out.println(bulkResponse.buildFailureMessage());
        } else {
            System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
        }
    }

    public void refreshIndices() {
        client.admin().indices().prepareRefresh(indexName).get();
    }

    public void search() {

        SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get();
        System.out.println("Total Hits : " + response.getHits().getTotalHits());
        System.out.println(response);
    }

    public void closeTransportClient() {
        if (client != null) {
            client.close();
        }
    }
}

到达此处时出现错误

org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute[=13=](TransportProxyClient.java:44)

有人可以帮我解决这个问题吗?

我在 csv 文件中的数据如下所示:-

 id  profile_id  hier_name        attri_name     item
  1   1          CUSTOMER         CUSTOMER        C001
  2   1          CUSTOMER         CUSTOMER        C002
  3   1           CUSTOMER        CUSTOMER        C003

可以添加的依赖是

<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>7.12.1</version>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.12.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client -->
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>transport-netty4-client</artifactId>
            <version>7.12.1</version>
        </dependency>




import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.stereotype.Component;

@Component
public class ESBulkIndexing {

    String indexName, indexTypeName;
    TransportClient client = null;

    public ESBulkIndexing() {

        indexTypeName = "bulkindexing";
    }

    public boolean initEStransportClinet() throws UnknownHostException {
        client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
        return true;
    }

    public void CSVbulkImport(boolean isHeaderIncluded, String index, String filename)
            throws IOException, ExecutionException, InterruptedException {
        BulkRequestBuilder bulkRequest = client.prepareBulk();

        File file = new File(filename + "/elastic.csv");
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));

        String line = null;
        int count = 0, noOfBatch = 1;
        if (bufferedReader != null && isHeaderIncluded) {
            bufferedReader.readLine();
        }
        while ((line = bufferedReader.readLine()) != null) {

            if (line.trim().length() == 0) {
                continue;
            }
            String data[] = line.split(",");
            try {
                XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[1])
                        .field("hierarchy_name", data[2]).field("attribute_name", data[3]).field("item_pk", data[4])
                        .endObject();
                bulkRequest.add(client.prepareIndex(index, indexTypeName, data[0]).setSource(xContentBuilder));

                addDocumentToESCluser(bulkRequest, noOfBatch, count);
                noOfBatch++;
            } catch (Exception e) {
                e.printStackTrace();
            }
            addDocumentToESCluser(bulkRequest, noOfBatch, count);
            count++;
        }
        bufferedReader.close();
        addDocumentToESCluser(bulkRequest, noOfBatch, count);

    }

    public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {

        if (count == 0) {
            return;
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if (bulkResponse.hasFailures()) {

            int numberOfDocFailed = 0;
            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();

            while (iterator.hasNext()) {
                BulkItemResponse response = iterator.next();
                if (response.isFailed()) {
                    numberOfDocFailed++;
                }
            }
            System.out.println(bulkResponse.buildFailureMessage());
        } else {
            System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
        }
    }

    public void refreshIndices(String index) {
        client.admin().indices().prepareRefresh(index).get();
    }

    public void closeTransportClient() {
        if (client != null) {
            client.close();
        }
    }
}