corePoolSize 为 0 的 ThreadPoolExecutor 在任务队列满之前不应执行任务

ThreadPoolExecutor with corePoolSize 0 should not execute tasks until task queue is full

我正在学习 Java 并发实践 并卡在 8.3.1 线程创建和拆卸 主题.以下脚注警告将 corePoolSize 保持为零。

Developers are sometimes tempted to set the core size to zero so that the worker threads will eventually be torn down and therefore won’t prevent the JVM from exiting, but this can cause some strange-seeming behavior in thread pools that don’t use a SynchronousQueue for their work queue (as newCachedThreadPool does). If the pool is already at the core size, ThreadPoolExecutor creates a new thread only if the work queue is full. So tasks submitted to a thread pool with a work queue that has any capacity and a core size of zero will not execute until the queue fills up, which is usually not what is desired.

所以为了验证这一点,我编写了这个程序,但它并不像上面所说的那样工作。

    final int corePoolSize = 0;
    ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());

    // If the pool is already at the core size
    if (tp.getPoolSize() == corePoolSize) {
        ExecutorService ex = tp;

        // So tasks submitted to a thread pool with a work queue that has any capacity
        // and a core size of zero will not execute until the queue fills up.
        // So, this should not execute until queue fills up.
        ex.execute(() -> System.out.println("Hello"));
    }

输出你好

那么,程序的行为是否表明 ThreadPoolExecutor 在提交任务时至少创建一个线程,而不考虑 corePoolSize=0。如果是,那么教科书上的警告是什么。

编辑: 在@S.K.'s 上测试了 jdk1.5.0_22 中的代码建议进行以下更改:

ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1));//Queue size is set to 1.

但是有了这个改变,程序终止而不打印任何输出。

我是不是误解了书中的这些说法?

编辑 (@sjlee): 很难在评论中添加代码,所以我将其作为编辑添加到这里......你能试试这个修改吗? 运行 它针对最新的 JDK 和 JDK 1.5?

final int corePoolSize = 0;
ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

// If the pool is already at the core size
if (tp.getPoolSize() == corePoolSize) {
    ExecutorService ex = tp;

    // So tasks submitted to a thread pool with a work queue that has any capacity
    // and a core size of zero will not execute until the queue fills up.
    // So, this should not execute until queue fills up.
    ex.execute(() -> System.out.println("Hello"));
}
tp.shutdown();
if (tp.awaitTermination(1, TimeUnit.SECONDS)) {
    System.out.println("thread pool shut down. exiting.");
} else {
    System.out.println("shutdown timed out. exiting.");
}

@sjlee 已在评论中发布结果。

似乎这是旧 java 版本的错误,但现在 Java 1.8 中不存在。

根据 ThreadPoolExecutor.execute() 的 Java 1.8 文档:

     /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * ....
     */

在第二点,添加一个worker到队列后重新检查,如果不是排队任务,可以启动一个新线程,而不是回滚排队并启动一个新线程。

这就是正在发生的事情。在第一次检查期间,任务已排队,但在重新检查期间,将启动一个新线程来执行您的任务。

虽然 运行 这个程序在 jdk 1.5、1.6、1.7 和 1.8 中,但我发现 ThreadPoolExecutor#execute(Runnable) 在 1.5、1.6 和 1.7+ 中有不同的实现。这是我的发现:

JDK 1.5 实现

 //Here poolSize is the number of core threads running.

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    for (;;) {
        if (runState != RUNNING) {
            reject(command);
            return;
        }
        if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
            return;
        if (workQueue.offer(command))
            return;
        Runnable r = addIfUnderMaximumPoolSize(command);
        if (r == command)
            return;
        if (r == null) {
            reject(command);
            return;
        }
        // else retry
    }
}

corePoolSize 为 0 时,此实现不会创建线程,因此不会执行提供的任务。

JDK 1.6 实现

//Here poolSize is the number of core threads running.

  public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

JDK 1.6 即使 corePoolSize 为 0 也会创建一个新线程。

JDK 1.7+ 实现(类似于 JDK 1.6 但具有更好的锁和状态检查)

    public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

JDK 1.7 也会创建一个新线程,即使 corePoolSize 为 0。

所以,corePoolSize=0似乎是JDK 1.5和JDK 1.6+的每个版本中的特例。

但奇怪的是书上的解释和程序的结果都不吻合

当核心池大小为零时,ThreadPoolExecutor 在 Java 5 中的这种奇怪行为显然被识别为错误并在 Java 6 中悄悄修复。

事实上,由于在 6 和 7 之间修改了一些代码,该问题在 Java 7 中再次出现。然后它被报告为错误,确认为错误并已修复。

无论哪种方式,您都不应该使用受此错误影响的 Java 版本。 Java 5 于 2015 年停产,Java 6 及更高版本的最新可用版本不受影响。 "Java Concurrency In Practice" 的那部分不再合适。

参考文献: