如何在不阻塞当前代码路径的情况下获取ExecutorService的执行结果?

How to get the execution results of ExecutorService without blocking the current code path?

我有一个服务,它向 Callables 添加一堆请求,然后打印执行结果。目前,服务请求被阻止,直到我打印执行的所有 Future 结果。但是我想 return 200 和 运行 这些请求并行而不阻塞请求。我怎样才能做到这一点?下面是我的代码。

下面是我的 运行 并行代码。

public void runParallelFunctions(Callable<Map<String, String>> invokerTask) {
        List<Callable<Map<String, String>>> myTasks = new ArrayList<>();
        for (int i = 0; i < invocationCount; i++) {
            myTasks.add(invokerTask);
        }
        List<Future<Map<String, String>>> results = null;
        try {
            results = executorService.invokeAll(myTasks);
        } catch (InterruptedException e) {
        }
        this.printResultsFromParallelInvocations(results);
    }

下面是我如何打印 Futures 的结果。

private void printResultsFromParallelInvocations(List<Future<Map<String, String>>> results) {
        results.forEach(executionResults -> {
            try {
                executionResults.get().entrySet().forEach(entry -> {
                    LOGGER.info(entry.getKey() + ": " + entry.getValue());
                });
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
            }
        });
    }

下面是当有人向服务发出请求时我如何调用上述方法。

String documentToBeIndexed = GSON.toJson(indexDocument);
int documentId = indexMyDocument(documentToBeIndexed);
createAdditionalCandidatesForFuture(someInput);
return true;

在上面的代码中,我调用 createAdditionalCandidatesForFuture 然后 return 为真。但代码仍在等待 printResultsFromParallelInvocations 方法完成。如何在调用 createAdditionalCandidatesForFuture 后生成代码 return 而无需等待结果打印?我必须使用另一个执行程序线程打印结果还是有其他方法?任何帮助将不胜感激

答案是CompletableFuture

已更新runParallelFunctions

public void runParallelFunctions(Callable<Map<String, String>> invokerTask) {
    // write a wrapper to handle exception outside CompletableFuture
    Supplier<Map<String, String>> taskSupplier = () -> {
        try {
            // some task that takes a long time
            Thread.sleep(4000);
            return invokerTask.call();
        } catch (Exception e) {
            System.out.println(e);
        }
        // return default value on error
        return new HashMap<>();
    };
    
    for (int i = 0; i < 5; i++) {
        CompletableFuture.supplyAsync(taskSupplier, executorService)
                         .thenAccept(this::printResultsFromParallelInvocations);
    }
    // main thread immediately comes here after running through the loop
    System.out.println("Doing other work....");
}

而且,printResultsFromParallelInvocations 可能看起来像:

private void printResultsFromParallelInvocations(Map<String, String> result) {
        result.forEach((key, value) -> System.out.println(key + ": " + value));
    }

输出:

Doing other work....
// 4 secs wait
key:value

Future 上调用 get 将阻塞线程,直到任务完成,所以是的,您必须将结果的打印移动到另一个 thread/Executor 服务。

另一种选择是每个任务在完成时打印其结果,前提是为它们提供了执行此操作所需的工具(访问记录器等)。或者换句话说,每个任务分为两个连续的步骤:执行和打印。