elasticsearch批处理中如何使用BulkProcessor
How to use BulkProcessor in elasticsearch batch processing
我需要使用 Elasticsearch 的 bulkprocessor 将一些批量数据插入到 elasticsearch 中。
这是我从 elastic.co
那里得到的
import org.elasticsearch.action.bulk.BulkProcessor;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source("helloworld"));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
我是否需要为所有批量插入过程调用 BulkProcessor.builder?这是使用批量处理器的正确方法吗?
我找到了答案。下面的代码就是这样做的。
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
我需要使用 Elasticsearch 的 bulkprocessor 将一些批量数据插入到 elasticsearch 中。 这是我从 elastic.co
那里得到的import org.elasticsearch.action.bulk.BulkProcessor;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source("helloworld"));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
我是否需要为所有批量插入过程调用 BulkProcessor.builder?这是使用批量处理器的正确方法吗?
我找到了答案。下面的代码就是这样做的。
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}