Java 用于在 ElasticSearch 中索引文档的 ThreadPoolExecutor
Java ThreadPoolExecutor for indexing documents in ElasticSearch
我是 Java ThreaPoolExecutor
的新手,我写了一些任务来在弹性搜索中索引文档。通过 ThreaPoolExecutor
正在执行该任务并且工作正常。
但是,我对我的方法还是不是很确定。
请在下面找到我的代码
public class IndexApp {
public static void main(String[] args)
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
Map<String, Object> jsonMap ;
System.out.println("Indexing via Java Code ....");
Product prod1=new Product("1001", 123172l, "Product", "VG3000");
Product prod2=new Product("1002", 123172l, "Series", "Valves, VG3000");
Product prod3=new Product("1003", 3536633, "Series", "Activa RoofTop, VG3000 karthikeyan ");
Product prod4=new Product("1004", 123172l, "Product", "Activa RoofTop VG3000, 3000");
List<Product> objList=new ArrayList<Product>();
objList.add(prod1);
objList.add(prod2);
objList.add(prod3);
objList.add(prod4);
for(int i=0;i<objList.size();i++)
{
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", objList.get(i).getId());
jsonMap.put("catalog_id", objList.get(i).getCatalog_id());
jsonMap.put("catalog_type", objList.get(i).getCatalog_type());
jsonMap.put("values", objList.get(i).getValues());
IndexTask task = new IndexTask(jsonMap);
executor.execute(task);
}
executor.shutdown();
}
}
public class IndexTask implements Runnable {
private final static String INDEX_NAME = "index_prod";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
public IndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);
try {
IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
有人告诉我我的方法对于在弹性搜索中索引文档意义重大吗?
更新 2
请找到我修改后的代码。
我没有使用 IndexRequest
,而是使用了 BulkRequest
public class ProdCatIndexTask implements Runnable {
private final static String INDEX_NAME = "productcatalog_index";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
BulkRequest bulkRequest = new BulkRequest();
public ProdCatIndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
/*request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);*/
bulkRequest.add( new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString()).source(jsonMap));
try {
//IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
BulkResponse bulkResponse = SearchEngineClient.getInstance3().bulk(bulkRequest);
System.out.println("Triggered Bulk Request.....");
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果要并行批量加载数据,我推荐使用ElasticSearch API BulkProcessor
.
这里是https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.
如何使用 bulkProcessor 的示例:
bulkProcessor.add(new IndexRequest("indexName", "type")
.source(toJson(Product), XContentType.JSON);
如果你想更快,你可以减少replicas的数量为0,让ElasticSearch生成ID,因为如果你索引你自己的ID,每次ElasticSearch都会检查这个ID在ElasticSearch中是否存在。
关于如何提高加载性能的其他想法:
https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
我是 Java ThreaPoolExecutor
的新手,我写了一些任务来在弹性搜索中索引文档。通过 ThreaPoolExecutor
正在执行该任务并且工作正常。
但是,我对我的方法还是不是很确定。
请在下面找到我的代码
public class IndexApp {
public static void main(String[] args)
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
Map<String, Object> jsonMap ;
System.out.println("Indexing via Java Code ....");
Product prod1=new Product("1001", 123172l, "Product", "VG3000");
Product prod2=new Product("1002", 123172l, "Series", "Valves, VG3000");
Product prod3=new Product("1003", 3536633, "Series", "Activa RoofTop, VG3000 karthikeyan ");
Product prod4=new Product("1004", 123172l, "Product", "Activa RoofTop VG3000, 3000");
List<Product> objList=new ArrayList<Product>();
objList.add(prod1);
objList.add(prod2);
objList.add(prod3);
objList.add(prod4);
for(int i=0;i<objList.size();i++)
{
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", objList.get(i).getId());
jsonMap.put("catalog_id", objList.get(i).getCatalog_id());
jsonMap.put("catalog_type", objList.get(i).getCatalog_type());
jsonMap.put("values", objList.get(i).getValues());
IndexTask task = new IndexTask(jsonMap);
executor.execute(task);
}
executor.shutdown();
}
}
public class IndexTask implements Runnable {
private final static String INDEX_NAME = "index_prod";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
public IndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);
try {
IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
有人告诉我我的方法对于在弹性搜索中索引文档意义重大吗?
更新 2
请找到我修改后的代码。
我没有使用 IndexRequest
,而是使用了 BulkRequest
public class ProdCatIndexTask implements Runnable {
private final static String INDEX_NAME = "productcatalog_index";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
BulkRequest bulkRequest = new BulkRequest();
public ProdCatIndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
/*request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);*/
bulkRequest.add( new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString()).source(jsonMap));
try {
//IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
BulkResponse bulkResponse = SearchEngineClient.getInstance3().bulk(bulkRequest);
System.out.println("Triggered Bulk Request.....");
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果要并行批量加载数据,我推荐使用ElasticSearch API BulkProcessor
.
这里是https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.
如何使用 bulkProcessor 的示例:
bulkProcessor.add(new IndexRequest("indexName", "type")
.source(toJson(Product), XContentType.JSON);
如果你想更快,你可以减少replicas的数量为0,让ElasticSearch生成ID,因为如果你索引你自己的ID,每次ElasticSearch都会检查这个ID在ElasticSearch中是否存在。
关于如何提高加载性能的其他想法:
https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html