为什么 CompletableFuture.supplyAsync 成功的次数是随机的?

Why is CompletableFuture.supplyAsync succeeding a random number of times?

我对 Java 8 中的 lambda 和异步代码都不熟悉。我不断收到一些奇怪的结果...

我有以下代码:

import java.util.concurrent.CompletableFuture;

public class Program {

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            String test = "Test_" + i;
            final int a = i;

            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> doPost(test));
            cf.thenRun(() -> System.out.println(a)) ;
        }
    }

    private static boolean doPost(String t) {
        System.out.println(t);

        return true;
    }
}

实际代码要长得多,因为 doPost 方法会 post 一些数据到 Web 服务。但是,我可以用这个简单的代码复制我的问题。

我想让 doPost 方法执行 100 次,但出于性能原因异步执行(为了比执行 100 次同步调用更快地将数据推送到 Web 服务)。

在上面的代码中,'doPost' 方法是 运行 随机次数,但始终不超过 20-25 次。没有抛出异常。似乎某些线程处理机制默默地拒绝创建新线程和执行它们的代码,或者线程在没有崩溃程序的情况下默默地崩溃。

我还有一个问题,如果我向 doPost 方法添加比上面显示的更多的功能,它会达到一个点,该方法只是默默地中断。在这种情况下,我尝试在 return 语句之前添加一个 System.out.println("test"),但它从未被调用过。循环 100 次的循环确实 运行 100 次迭代。

至少可以说,这种行为令人困惑。

我错过了什么?为什么函数作为参数提供给 supplyAsync 运行 看似随机的次数?

编辑:只是想指出情况与标记为可能重复的问题不完全相同,因为该问题涉及任意深度嵌套的期货,而这个一个处理平行的。然而,他们失败的原因实际上是相同的。这些案例似乎截然不同,值得我单独提问,但其他人可能不同意...

默认情况下 CompletableFuture 使用自己的 ForkJoinPool.commonPool()(参见 CompletableFuture 实现)。这个默认池只创建 daemon 线程,例如如果它们还活着,它们将不会阻止主应用程序终止。

您有以下选择:

  1. 将所有 CompletionStage 收集到某个数组,然后制作 java.util.concurrent.CompletableFuture#allOf().toCompletableFuture().join() - 这将保证在 之后完成所有阶段加入()

  2. 对你自己的线程池使用 *Async 操作,它只包含 non-daemon 线程,就像在以下示例:

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(10, r -> {
            Thread t = new Thread(r);
            t.setDaemon(false); // must be not daemon
            return t;
        });
    
        for (int i = 0; i < 100; i++) {
            final int a = i;
    
            // the operation must be Async with our thread pool
            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> doPost(a), pool);
            cf.thenRun(() -> System.out.printf("%s: Run_%s%n", Thread.currentThread().getName(), a));
        }
    
        pool.shutdown(); // without this the main application will be blocked forever
    }
    
    private static boolean doPost(int t) {
        System.out.printf("%s: Post_%s%n", Thread.currentThread().getName(), t);
    
        return true;
    }