使用 Java 中的 CompletableFuture 并行执行 for 循环并记录执行

Execute a for loop in parallel using CompletableFuture in Java and log the execution

我有一个 for 循环,我正在尝试使用 CompletableFuture 对其进行并行化。

for (int i = 0; i < 10000; i++) {
    doSomething();
    doSomethingElse();
}

我目前的情况是:

for (int i = 0; i < 10000; i++) {
    CompletableFuture.runAsync(() -> doSomething());
    CompletableFuture.runAsync(() -> doSomethingElse());
}

我想这可以达到目的,但是需要在所有处理开始和结束之前打印日志。如果我这样做:

log("Started doing things");
for (int i = 0; i < 10000; i++) {
    CompletableFuture.runAsync(() -> doSomething());
    CompletableFuture.runAsync(() -> doSomethingElse());
}
log("Ended doing things");

这是否保证在所有 for 循环结束后打印第二条日志语句,因为它是在单独的线程中执行的?如果没有,有没有办法在不阻塞主线程的情况下做到这一点?

您必须收集所有 CompletableFutures 并等待它们完成:

log("Started doing things");
List<CompletableFuture> futures = new ArrayList();
for (int i = 0; i < 10000; i++) {
    futures.add(CompletableFuture.runAsync(() -> doSomething()));
    futures.add(CompletableFuture.runAsync(() -> doSomethingElse()));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                 .thenRunAsync(() -> log("Ended doing things"));

或者当您使用 ExecutorService 时:

CompletableFuture.runAsync(() -> {
    try {
        executorService.invokeAll(tasks);
    } catch (InterruptedException) {
        e.printStackTrace();
    }
    log("Ended doing things");
});

我想 CompletableFuture 是不符合您需求的概念。如果您想并行执行任意数量的类似任务,最简单的方法是在 ExecutionService:

上使用方法 invokeAll(...)
// First, create a list with all tasks you want to execute in parallel
List<Callable<?>> tasks = new ArrayList<>(10000);
for (int i = 0; i < 10000; ++i) {
    // we need to create Callables, so if your doSomething method returns void, we have to convert it to a Callable using helper method from class Executors
    tasks.add(Executors.callable(this::doSomething));
}

// Then, create an executor service that can execute these tasks
// There are different executors you can choose from, I take one that has a fixed pool of threads
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

// Last but not least, call invokeAll to execute all tasks and wait for them to complete
executorService.invokeAll(tasks);

// This method will be called when all tasks have been completed successfully:
System.out.println("done");