Java 用于在 ElasticSearch 中索引文档的 ThreadPoolExecutor

Java ThreadPoolExecutor for indexing documents in ElasticSearch

我是 Java ThreaPoolExecutor 的新手,我写了一些任务来在弹性搜索中索引文档。通过 ThreaPoolExecutor 正在执行该任务并且工作正常。

但是,我对我的方法还是不是很确定。

请在下面找到我的代码

public class IndexApp {

    public static void main(String[] args)
    {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        Map<String, Object> jsonMap ;

        System.out.println("Indexing via Java Code ....");
        Product prod1=new Product("1001", 123172l, "Product", "VG3000");
        Product prod2=new Product("1002", 123172l, "Series", "Valves, VG3000");
        Product prod3=new Product("1003", 3536633, "Series", "Activa RoofTop, VG3000 karthikeyan ");
        Product prod4=new Product("1004", 123172l, "Product", "Activa RoofTop VG3000, 3000");

        List<Product> objList=new ArrayList<Product>();
        objList.add(prod1);
        objList.add(prod2);
        objList.add(prod3);
        objList.add(prod4);

        for(int i=0;i<objList.size();i++)
        {
            jsonMap = new HashMap<String, Object>();
            jsonMap.put("id", objList.get(i).getId());
            jsonMap.put("catalog_id", objList.get(i).getCatalog_id());
            jsonMap.put("catalog_type", objList.get(i).getCatalog_type());
            jsonMap.put("values", objList.get(i).getValues());
            IndexTask task = new IndexTask(jsonMap);
            executor.execute(task);
        }
         executor.shutdown();
    }

}


public class IndexTask implements Runnable {

private final static String INDEX_NAME = "index_prod";

Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;

public IndexTask(Map<String, Object> jsonMap ) {
    this.jsonMap = jsonMap;
}

public Map<String, Object> getJsonMap() {
    return jsonMap;
}

public void run() {
    try {
        Long duration = (long) (Math.random() * 10);
        System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );

        request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
                .source(jsonMap);

        try {
            IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout 
        } catch(ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
            }
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        TimeUnit.SECONDS.sleep(duration);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

有人告诉我我的方法对于在弹性搜索中索引文档意义重大吗?

更新 2

请找到我修改后的代码。

我没有使用 IndexRequest,而是使用了 BulkRequest

public class ProdCatIndexTask implements Runnable {

    private final static String INDEX_NAME = "productcatalog_index";

    Product prod=new Product();
    IndexRequest request;
    Map<String, Object> jsonMap ;

    BulkRequest bulkRequest = new BulkRequest();

    public ProdCatIndexTask(Map<String, Object> jsonMap ) {
        this.jsonMap = jsonMap;
    }

    public Map<String, Object> getJsonMap() {
        return jsonMap;
    }

    public void run() {
        try {
            Long duration = (long) (Math.random() * 10);
            System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );

            /*request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
                    .source(jsonMap);*/

            bulkRequest.add( new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString()).source(jsonMap));

            try {
                //IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
                BulkResponse bulkResponse = SearchEngineClient.getInstance3().bulk(bulkRequest);
                System.out.println("Triggered Bulk Request.....");
            } catch(ElasticsearchException e) {
                if (e.status() == RestStatus.CONFLICT) {
                }
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }

            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

如果要并行批量加载数据,我推荐使用ElasticSearch API BulkProcessor.

这里是https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.

如何使用 bulkProcessor 的示例:

bulkProcessor.add(new IndexRequest("indexName", "type")
.source(toJson(Product), XContentType.JSON);

如果你想更快,你可以减少replicas的数量为0,让ElasticSearch生成ID,因为如果你索引你自己的ID,每次ElasticSearch都会检查这个ID在ElasticSearch中是否存在。

关于如何提高加载性能的其他想法:

https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html