程序不会停止 运行 即使执行程序已关闭且期货已取消

Program doesn't stop running even though executor was shutdown and futures cancelled

我编写了一个小代码来练习执行器和线程。它包括以下内容:

  1. 使用无限队列创建大小为 3 的固定线程池。
  2. 提交 3 个无限循环任务 (while(true)) 到任务池(然后 所有线程都被占用)
  3. 提交第 4 个任务,该任务将在队列中等待。
  4. executor.shutdown() 并执行 println 以查看我如何进行活动任务和任务计数。
  5. 将标志设置为 false 以停止无限 while 然后执行 println 用于查看我如何进行活动任务和任务计数
  6. mayInterruptIfRunning=true取消所有期货,然后 做一个 println 看看我如何使活动任务和任务计数 有

这是代码:

public class Main {


private static ThreadPoolExecutor fixexThreadPool;

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    final Boolean[] flag = new Boolean[1];
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    for (int i = 0; i < 3; i++) {
        futures.add(fixexThreadPool.submit(() -> {
            int a = 1;
            while (flag[0]) {
                a++;
            }
            System.out.println("Finishing thread execution.");
        }));
    }
    System.out.println("Done submiting 3 threads.");
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(3000L);
    System.out.println("Submitting a 4th thread.");

    futures.add(fixexThreadPool.submit(() -> {
        int a = 1;
        while (flag[0]) {
            a++;
        }
        System.out.println("Finishing thread execution");
    }));

    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));

    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> f.cancel(true));
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
}

}

这是执行的输出:

有几件事我不明白。

  1. 为什么关闭executor后,还有活跃的线程?
  2. 为什么在将标志更改为 false 以打破无限循环后,无限 while 没有打破?
  3. 为什么取消每个future后,还有活动线程?
  4. 无论将标志更改为 false、关闭执行程序甚至取消所有 futures,我的程序都不会停止 运行。这是为什么?

提前致谢!!

这个问题可以通过使用volatile关键字来解决。 This thread provides a wealth of answers explaining what volatile is in detail and there are plenty of tutorials/sources/blogs out there that can provide further detail. Another super detailed thread about volatile .

老实说,互联网上有很多人可以比我更好更准确地解释它,但简而言之 - volatile 是一个 Java 修饰符,应该在你有资源时使用被多个线程共享。它告诉 JVM 确保每个线程缓存的值与主内存中的值同步。

很明显,JVM 在某个地方崩溃了,线程持有的值与实际值不完全匹配。对你的实现做一个小改动就可以解决这个问题:

使标志成为 class 实例变量

private volatile static Boolean[] flag = new Boolean[1];

请参阅 了解我这样做的原因。

所以给出稍微大一点的图片:

private static ThreadPoolExecutor fixexThreadPool;
private volatile static Boolean[] flag = new Boolean[1];

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    ...

代码现在顺利停止,没有任何问题,希望这对您有所帮助:)

(附带一提,很好奇你为什么使用 Boolean[] 而不仅仅是布尔值?我保留它是为了在我的回答中保持一致,但它显然也适用于 flag 只是一个布尔值而不是一个数组)

-- 编辑--

回答您最近的评论 - 恐怕我的理解几乎停留在我已经写的内容上,但我可以提供我的想法。您的应用在您调用 fixexThreadPool.shutdown(); 时未 "exit" 的原因似乎可以在 ThreadPoolExecutor 的文档中找到。即 -

public void shutdown()

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

while循环已经提交,所以它会愉快地继续执行。

我稍微研究了一下,看看发生了什么。

首先,我不喜欢那么长的状态日志行,所以我为它创建了一个单独的方法!我还注意到 ThreadPoolExecutor 和 Future 中有一些有趣的布尔状态,因此决定也记录它们:

private static void Log() {
     System.out.println(String.format("\nActive count: %s | Completed task count: %s | task count: %s", 
             fixexThreadPool.getActiveCount(), 
             fixexThreadPool.getCompletedTaskCount(), 
             fixexThreadPool.getTaskCount()));
     System.out.println(String.format("isShutdown : %s | isTerminated : %s | isTerminating : %s ", 
             fixexThreadPool.isShutdown(), 
             fixexThreadPool.isTerminated(), 
             fixexThreadPool.isTerminating())); 
     System.out.println(String.format("Futures size = %s", futures.size()));
     futures.forEach(f -> System.out.println(String.format("Future isCancelled : %s | isDone : %s", f.isCancelled(), f.isDone())));
     System.out.println("");
}

将其放入您的代码中,我们得到:

    Log();
    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    Log();
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    Log();
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> System.out.println(String.format("Future cancelled - %s", f.cancel(true))));
    Log();

我还想为应用程序添加一个快速心跳,不时打印一个,这样我就可以看到幕后 was/wasn 还没有 运行 的内容:

private static void Heartbeat(int a) {
    int amod = a % 1000000000;
    if(amod == 0) {
        System.out.println(a);
    }
}

正在使用:

while (flag[0]) {
    a++;
    Heartbeat(a);
}

然后我 运行 进行了几次测试,以不同的组合尝试了一些不同的东西:

  • 注释掉fixexThreadPool.shutdown();
  • 注释掉futures.forEach(f -> f.cancel(true));
  • 注释掉flag[0] = false;
  • 尝试 with/without 新的 volatile 修复。

为了让已经很长的答案更短一些,您可以自己探索这些组合,但我发现

  • 没有 volatile 关键字,线程卡在 terminating 状态。我只能假设 shutdown() 不会停止 while 由于这些任务已经提交,因此循环执行他们正在做的事情,因此 none 的线程认为 flags[0] == false,他们会进行递增a。

    据我所知,这表现出文档中概述的行为。所以关闭不会停止你的 while 循环,它只是停止任何新的未来被提交到线程池(并被添加到阻塞队列),然后耐心地等待 while 循环完成(这显然没有标志上的 volatile 修饰符,他们永远不会)。

  • 使用volatile,但是注释掉shutdown,很明显,每个task 终止(控制台记录 4 "Finishing thread execution.")但是 线程池保持活动状态,程序本身不会终止。池正在耐心地等待新任务,它显然永远不会得到。
  • `future.cancel'的逻辑目前有点超出我的理解范围。我 运行 一个测试只调用 future.cancel 并做了一些更深入的 logging/investigation 但没有真正的具体答案。

    我的 运行 理论是 future 已经被添加到线程池中的阻塞队列中,因此调用 cancel 不会对线程池的执行产生任何影响,因此调用 future.cancel 本身对 fixexThreadPool 没有任何作用。

希望这能提供足够的思考 :)

Why, after shutting down executor, there still are active threads?

通过调用 shutdown,我们请求 normal shutdown。线程池停止接受新任务,但等待已提交的任务完成 - 运行 或排队。 因此关闭方法不会尝试停止线程。从概念上讲,ExecutorService 实现将任务提交与任务执行分开。换句话说,我们拥有任务而不是线程。

Why, after changing the flag to false in order to break the infinite loop, the infinite while doesn't break?

我们在这里使用 cancellation flag。但是根据 Java 内存模型,要让多个线程看到对共享数据所做的更改 - flag - 我们必须采用同步或使用 volatile 关键字。 因此,虽然同步提供互斥和内存可见性,但 volatile 关键字仅提供内存可见性。由于这里只有主线程修改了flag变量,所以定义flag为static volatile变量就可以正常工作了

public class Main {


    private static ThreadPoolExecutor fixexThreadPool;
    private static volatile Boolean[] flag = new Boolean[1];
    // the main method
}

Why, after canceling every future, there is are active threads ?

ThreadPoolExecutor 默认使用 FutureTask class 作为 Future 实现。并且 FutureTask 通过中断工作线程来取消。所以可以期望 thread interruption 停止任务甚至终止线程。但是线程中断是一种协作机制。首先,任务必须响应中断。它必须用 Thread.isInterrupted 检查中断标志并在可能的情况下退出。其次,线程所有者,在我们的例子中是线程池,必须检查中断状态并采取相应行动。在此示例中,任务不响应中断。它使用一个标志来取消它的操作。那么让我们继续讨论线程所有者。 ThreadPoolExecutor不作用于中断状态,所以不会终止线程。

No matter if a change the flag to false, shutdown executor or even canceling all futures, my program doesn't stop running. Why is that?

如前所述,当前任务使用取消标志的方式。所以使用volatile关键字一定能解决问题。然后任务将按预期停止。另一方面,取消不会对当前任务产生任何影响。因为它不检查中断状态。我们可以让它像这样响应:

while (flag[0] && !Thread.currentThread().isInterrupted())

这样,任务也支持线程中断取消。