使用 bulkProcessor 时捕获 Elasticsearch 批量错误

Catch Elasticsearch bulk errors when using bulkProcessor

我在 ElasticSearch 中使用 bulkProcessor insert/update 批量处理。 我想赶上

但它不会抛出任何东西。 它只在响应项上设置一条消息。 我该如何妥善处理?例如如果被拒绝,应用重试...

public BulkResponse response bulkUpdate(.....) {
    BulkResponse bulkWriteResult = null;
    long startTime = System.currentTimeMillis();
    AtomicInteger amountOfRequests = new AtomicInteger();
    long esTime;


    ElasticBulkProcessorListener listener = new    ElasticBulkProcessorListener(updateOperations);
    BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener)
        .setBulkActions(MAX_BULK_ACTIONS)
        .setBulkSize(new ByteSizeValue(maxBulkSize, ByteSizeUnit.MB))
        .setConcurrentRequests(5)
        .build();


    updateOperations.forEach(updateRequest -> {
        bulkProcessor.add(updateRequest);
        amountOfRequests.getAndIncrement();
    });

try {
    boolean isFinished = bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS);
    if (isFinished) {
        if (listener.getBulkWriteResult() != null) {
            bulkWriteResult = listener.getBulkWriteResult();
        } else {
            throw new Exception("Bulk updating failed, results are empty");
        }
    } else {
        throw new Exception("Bulk updating failed, received timeout");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

return bulkWriteResult;
}


public class ElasticBulkProcessorListener implements BulkProcessor.Listener {
private long esTime = 0;
private List<Throwable> errors;
private BulkResponse response;

public long getEsTime() {
    return esTime;
}

@Override
public void beforeBulk(long executionId, BulkRequest request) {
    String description = "";
    if (!request.requests().isEmpty()) {
        ActionRequest request1 = request.requests().get(0);
        description = ((UpdateRequest) request1).type();
    }

    log.info("Bulk executionID: {}, estimated size is: {}MB, number of actions: {}, request type: {}",
            executionId, (request.estimatedSizeInBytes() / 1000000), request.numberOfActions(), description);
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    log.info("Bulk executionID: {}, took : {} Millis, bulk size: {}", executionId, response.getTookInMillis(), response.getItems().length);
    esTime = response.getTookInMillis();
    response = createBulkUpdateResult(response);
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    log.error("Bulk , failed! error: ", executionId, failure);
    throw new DataFWCoreException(String.format("Bulk executionID: %d, update operation failed", executionId), failure);
}

}

仅当网络出现故障时才会调用故障处理程序, 任何其他情况都将获得成功处理程序。

我上面提到的处理异常的唯一方法是解析每个响应项并弄清楚发生了什么。