如何使用 Java 将 CSV 文件索引到 Elasticsearch?
How to Index our CSV file into Elastic Search using Java?
有没有什么方法可以使用 Java 将我们的 CSV 文件索引到 Elastic Search 中,我之前是使用 logstash 来做的,但现在我需要在 Java 和 运行 中对其进行编码动态..
我尝试使用 Index API 但它不适合我的情况
有人可以帮我吗..
我的 CSV 数据看起来像这样这只是一个样本我有这个对象..
样本 CSV 数据是这样的..
id profile_id hier_name attri_name item
1 1 CUSTOMER CUSTOMER C001
2 1 CUSTOMER CUSTOMER C002
3 1 CUSTOMER CUSTOMER C003
这是我尝试进行批量插入的方法,但它似乎不适用于我当前版本的 Elastic Search 7.12.0,
package com.javadeveloperzone;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
public class ESBulkIndexingExample {
String indexName, indexTypeName;
TransportClient client = null;
public static void main(String[] args) {
ESBulkIndexingExample esExample = new ESBulkIndexingExample();
try {
esExample.initEStransportClinet();
System.out.println("init done");
esExample.CSVbulkImport(true);
System.out.println("bulkimport done");
esExample.refreshIndices();
esExample.search();
} catch (Exception e) {
e.printStackTrace();
} finally {
esExample.closeTransportClient(); // close transport client
}
}
public ESBulkIndexingExample() {
indexName = "document";
indexTypeName = "bulkindexing";
}
public boolean initEStransportClinet() {
try {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return true;
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException {
BulkRequestBuilder bulkRequest = client.prepareBulk();
File file = new File(
"/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
int count = 0, noOfBatch = 1;
if (bufferedReader != null && isHeaderIncluded) {
bufferedReader.readLine();
}
while ((line = bufferedReader.readLine()) != null) {
if (line.trim().length() == 0) {
continue;
}
String data[] = line.split(",");
if (data.length == 3) {
try {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[0])
.field("hierarchy_name", data[1]).field("attribute_name", data[2]).field("item_pk", data[3])
.endObject();
BulkRequestBuilder add = bulkRequest
.add(client.prepareIndex(indexName, indexTypeName, data[0]).setSource(xContentBuilder));
System.out.println(add);
if ((count + 1) % 500 == 0) {
count = 0;
addDocumentToESCluser(bulkRequest, noOfBatch, count);
noOfBatch++;
}
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("Invalid data : " + line);
}
count++;
}
bufferedReader.close();
addDocumentToESCluser(bulkRequest, noOfBatch, count);
}
public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {
if (count == 0) {
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("Bulk Indexing failed for Batch : " + noOfBatch);
int numberOfDocFailed = 0;
Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
while (iterator.hasNext()) {
BulkItemResponse response = iterator.next();
if (response.isFailed()) {
numberOfDocFailed++;
}
}
System.out.println("Out of " + count + " documents, " + numberOfDocFailed + " documents failed");
System.out.println(bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
}
}
public void refreshIndices() {
client.admin().indices().prepareRefresh(indexName).get();
}
public void search() {
SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get();
System.out.println("Total Hits : " + response.getHits().getTotalHits());
System.out.println(response);
}
public void closeTransportClient() {
if (client != null) {
client.close();
}
}
}
到达此处时出现错误
org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute[=13=](TransportProxyClient.java:44)
有人可以帮我解决这个问题吗?
我在 csv 文件中的数据如下所示:-
id profile_id hier_name attri_name item
1 1 CUSTOMER CUSTOMER C001
2 1 CUSTOMER CUSTOMER C002
3 1 CUSTOMER CUSTOMER C003
可以添加的依赖是
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>7.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client -->
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>7.12.1</version>
</dependency>
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.stereotype.Component;
@Component
public class ESBulkIndexing {
String indexName, indexTypeName;
TransportClient client = null;
public ESBulkIndexing() {
indexTypeName = "bulkindexing";
}
public boolean initEStransportClinet() throws UnknownHostException {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return true;
}
public void CSVbulkImport(boolean isHeaderIncluded, String index, String filename)
throws IOException, ExecutionException, InterruptedException {
BulkRequestBuilder bulkRequest = client.prepareBulk();
File file = new File(filename + "/elastic.csv");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
int count = 0, noOfBatch = 1;
if (bufferedReader != null && isHeaderIncluded) {
bufferedReader.readLine();
}
while ((line = bufferedReader.readLine()) != null) {
if (line.trim().length() == 0) {
continue;
}
String data[] = line.split(",");
try {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[1])
.field("hierarchy_name", data[2]).field("attribute_name", data[3]).field("item_pk", data[4])
.endObject();
bulkRequest.add(client.prepareIndex(index, indexTypeName, data[0]).setSource(xContentBuilder));
addDocumentToESCluser(bulkRequest, noOfBatch, count);
noOfBatch++;
} catch (Exception e) {
e.printStackTrace();
}
addDocumentToESCluser(bulkRequest, noOfBatch, count);
count++;
}
bufferedReader.close();
addDocumentToESCluser(bulkRequest, noOfBatch, count);
}
public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {
if (count == 0) {
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
int numberOfDocFailed = 0;
Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
while (iterator.hasNext()) {
BulkItemResponse response = iterator.next();
if (response.isFailed()) {
numberOfDocFailed++;
}
}
System.out.println(bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
}
}
public void refreshIndices(String index) {
client.admin().indices().prepareRefresh(index).get();
}
public void closeTransportClient() {
if (client != null) {
client.close();
}
}
}
有没有什么方法可以使用 Java 将我们的 CSV 文件索引到 Elastic Search 中,我之前是使用 logstash 来做的,但现在我需要在 Java 和 运行 中对其进行编码动态.. 我尝试使用 Index API 但它不适合我的情况 有人可以帮我吗.. 我的 CSV 数据看起来像这样这只是一个样本我有这个对象..
样本 CSV 数据是这样的..
id profile_id hier_name attri_name item
1 1 CUSTOMER CUSTOMER C001
2 1 CUSTOMER CUSTOMER C002
3 1 CUSTOMER CUSTOMER C003
这是我尝试进行批量插入的方法,但它似乎不适用于我当前版本的 Elastic Search 7.12.0,
package com.javadeveloperzone;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
public class ESBulkIndexingExample {
String indexName, indexTypeName;
TransportClient client = null;
public static void main(String[] args) {
ESBulkIndexingExample esExample = new ESBulkIndexingExample();
try {
esExample.initEStransportClinet();
System.out.println("init done");
esExample.CSVbulkImport(true);
System.out.println("bulkimport done");
esExample.refreshIndices();
esExample.search();
} catch (Exception e) {
e.printStackTrace();
} finally {
esExample.closeTransportClient(); // close transport client
}
}
public ESBulkIndexingExample() {
indexName = "document";
indexTypeName = "bulkindexing";
}
public boolean initEStransportClinet() {
try {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return true;
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException {
BulkRequestBuilder bulkRequest = client.prepareBulk();
File file = new File(
"/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
int count = 0, noOfBatch = 1;
if (bufferedReader != null && isHeaderIncluded) {
bufferedReader.readLine();
}
while ((line = bufferedReader.readLine()) != null) {
if (line.trim().length() == 0) {
continue;
}
String data[] = line.split(",");
if (data.length == 3) {
try {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[0])
.field("hierarchy_name", data[1]).field("attribute_name", data[2]).field("item_pk", data[3])
.endObject();
BulkRequestBuilder add = bulkRequest
.add(client.prepareIndex(indexName, indexTypeName, data[0]).setSource(xContentBuilder));
System.out.println(add);
if ((count + 1) % 500 == 0) {
count = 0;
addDocumentToESCluser(bulkRequest, noOfBatch, count);
noOfBatch++;
}
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("Invalid data : " + line);
}
count++;
}
bufferedReader.close();
addDocumentToESCluser(bulkRequest, noOfBatch, count);
}
public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {
if (count == 0) {
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("Bulk Indexing failed for Batch : " + noOfBatch);
int numberOfDocFailed = 0;
Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
while (iterator.hasNext()) {
BulkItemResponse response = iterator.next();
if (response.isFailed()) {
numberOfDocFailed++;
}
}
System.out.println("Out of " + count + " documents, " + numberOfDocFailed + " documents failed");
System.out.println(bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
}
}
public void refreshIndices() {
client.admin().indices().prepareRefresh(indexName).get();
}
public void search() {
SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get();
System.out.println("Total Hits : " + response.getHits().getTotalHits());
System.out.println(response);
}
public void closeTransportClient() {
if (client != null) {
client.close();
}
}
}
到达此处时出现错误
org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;
at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15)
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31)
at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute[=13=](TransportProxyClient.java:44)
有人可以帮我解决这个问题吗?
我在 csv 文件中的数据如下所示:-
id profile_id hier_name attri_name item
1 1 CUSTOMER CUSTOMER C001
2 1 CUSTOMER CUSTOMER C002
3 1 CUSTOMER CUSTOMER C003
可以添加的依赖是
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>7.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client -->
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>7.12.1</version>
</dependency>
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.stereotype.Component;
@Component
public class ESBulkIndexing {
String indexName, indexTypeName;
TransportClient client = null;
public ESBulkIndexing() {
indexTypeName = "bulkindexing";
}
public boolean initEStransportClinet() throws UnknownHostException {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return true;
}
public void CSVbulkImport(boolean isHeaderIncluded, String index, String filename)
throws IOException, ExecutionException, InterruptedException {
BulkRequestBuilder bulkRequest = client.prepareBulk();
File file = new File(filename + "/elastic.csv");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String line = null;
int count = 0, noOfBatch = 1;
if (bufferedReader != null && isHeaderIncluded) {
bufferedReader.readLine();
}
while ((line = bufferedReader.readLine()) != null) {
if (line.trim().length() == 0) {
continue;
}
String data[] = line.split(",");
try {
XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[1])
.field("hierarchy_name", data[2]).field("attribute_name", data[3]).field("item_pk", data[4])
.endObject();
bulkRequest.add(client.prepareIndex(index, indexTypeName, data[0]).setSource(xContentBuilder));
addDocumentToESCluser(bulkRequest, noOfBatch, count);
noOfBatch++;
} catch (Exception e) {
e.printStackTrace();
}
addDocumentToESCluser(bulkRequest, noOfBatch, count);
count++;
}
bufferedReader.close();
addDocumentToESCluser(bulkRequest, noOfBatch, count);
}
public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {
if (count == 0) {
return;
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
int numberOfDocFailed = 0;
Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
while (iterator.hasNext()) {
BulkItemResponse response = iterator.next();
if (response.isFailed()) {
numberOfDocFailed++;
}
}
System.out.println(bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);
}
}
public void refreshIndices(String index) {
client.admin().indices().prepareRefresh(index).get();
}
public void closeTransportClient() {
if (client != null) {
client.close();
}
}
}