使用 CompletableFuture supplyAsync 的集合并行处理然后收集结果

Parallel processing using collection of CompletableFuture supplyAsync then collecting results

//Unit of logic I want to make it to run in parallel
public PagesDTO convertOCRStreamToDTO(String pageId, Integer pageSequence) throws Exception {
    LOG.info("Get OCR begin for pageId [{}] thread name {}",pageId, Thread.currentThread().getName());
    OcrContent ocrContent = getOcrContent(pageId);
    OcrDTO ocrData = populateOCRData(ocrContent.getInputStream());
    PagesDTO pageDTO = new PagesDTO(pageId, pageSequence.toString(), ocrData);
    return pageDTO; 
}

并行执行 convertOCRStreamToDTO(..) 然后在各个线程执行完成时收集其结果的逻辑

List<PagesDTO> pageDTOList = new ArrayList<>();
//javadoc: Creates a work-stealing thread pool using all available processors as its target parallelism level.
ExecutorService newWorkStealingPool = Executors.newWorkStealingPool(); 
Instant start = Instant.now();
List<CompletableFuture<PagesDTO>> pendingTasks = new ArrayList<>();
List<CompletableFuture<PagesDTO>> completedTasks = new ArrayList<>();
CompletableFuture<<PagesDTO>> task = null;

for (InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    String pageId = dcInputPageDTO.getPageId();
    task = CompletableFuture
            .supplyAsync(() -> {
                try {
                    return convertOCRStreamToDTO(pageId, pageSequence.getAndIncrement());
                } catch (HttpHostConnectException | ConnectTimeoutException e) {
                    LOG.error("Error connecting to Redis for pageId [{}]", pageId, e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.REDIS_CONNECTION_FAILURE),
                            " Connecting to the Redis failed while getting OCR for pageId ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                } catch (CaptureException e) {
                    LOG.error("Error in Document Classification Engine Service while getting OCR for pageId [{}]",pageId,e);
                    exceptionMap.put(pageId,e);
                } catch (Exception e) {
                    LOG.error("Error getting OCR content for the pageId [{}]", pageId,e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.TECHNICAL_FAILURE),
                            "Error while getting ocr content for pageId : ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                }
                return null;
            }, newWorkStealingPool);
    //collect all async tasks
    pendingTasks.add(task);
}

//TODO: How to avoid unnecessary loops which is happening here just for the sake of waiting for the future tasks to complete???
//TODO: Looking for the best solutions
while(pendingTasks.size() > 0) {
    for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
        if(futureTask != null && futureTask.isDone()){
            completedTasks.add(futureTask);
            pageDTOList.add(futureTask.get());
        }
    }
    pendingTasks.removeAll(completedTasks);
}

//Throw the exception cought while getting converting OCR stream to DTO - for any of the pageId
for(InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    if(exceptionMap.containsKey(dcInputPageDTO.getPageId())) {
        CaptureException e = exceptionMap.get(dcInputPageDTO.getPageId());
        throw e;
    }
}

LOG.info("Parallel processing time taken for {} pages = {}", dcReqDTO.getPages().size(),
        org.springframework.util.StringUtils.deleteAny(Duration.between(Instant.now(), start).toString().toLowerCase(), "pt-"));

请查看我上面的代码库待办事项,我有以下两个问题,我正在通过 Whosebug 寻求建议:

1) I want to avoid unnecessary looping (happening in while loop above), what is the best way for optimistically I wait for all threads to complete its async execution then collect my results out of it??? Please anybody has an advice?

2) ExecutorService instance is created at my service bean class level, thinking that, it will be re-used for every requests, instead create it local to the method, and shutdown in finally. Am I doing right here?? or any correction in my thought process?

只需删除 whileif 即可:

for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
    completedTasks.add(futureTask);
    pageDTOList.add(futureTask.get());
}

get()(以及 join())将在返回值之前等待未来完成。此外,无需测试 null,因为您的列表永远不会包含任何内容。

但是,您可能应该更改处理异常的方式。 CompletableFuture 有一个特定的机制来处理它们并在调用 get()/join() 时重新抛出它们。您可能只想将已检查的异常包装在 CompletionException.