使用 ThreadPoolExecutor 缩放 maxPoolSize;为什么池不会动态增加其大小?

Scaling the maxPoolSize using ThreadPoolExecutor; Why the pool does not increases dynamically its size?

作为初学者,我正在 java 中学习线程和并发包,并且我阅读了有关 ThreadPoolExecutor 的文档以了解 getPoolSize()getCorePoolSize() 之间的区别, getMaxPoolSize() 并尝试在代码中实现相同的功能。

背景知识

根据我的理解,Executors.newFixedThreadPool(3) 创建了一个池 corePoolSize=3 并且当我们继续通过池执行任务时,将创建线程直到 3,然后当底层队列大小达到 100 并且仍然提交新任务时,这就是 maxPoolSize 出现的地方,并且线程现在从 corePoolSize 缩放到 maxPoolSize .

public class Test {

    static ThreadPoolExecutor pool=(ThreadPoolExecutor)Executors.newFixedThreadPool(3);
    
    public static void main(String k[]) throws InterruptedException{
        BlockingQueue<Runnable> queue=pool.getQueue();
        pool.execute(()->{
            for(int b=0;b<10;b++)
                System.out.println("Hello "+b);
        });
        pool.execute(()->{
            for(int b=0;b<10;b++)
                System.out.println("Hello "+b);
        });
        pool.setMaximumPoolSize(10);  //Setting maxPoolSize
        
     for(int j=0;j<20000;j++)       
        pool.execute(()->{
            for(int b=0;b<100;b++){
                System.out.println("Hello "+b);
            System.out.println("Queue size "+queue.size()+" "+"and pool size "+pool.getPoolSize()); 
        }
    });
 }

执行上述程序时,我可以看到队列大小达到 b/w 12000-20000,如果是这种情况,则 getPoolSize() 必须打印大于 corePoolSize 的值,因为 maxPoolSize 设置为10 但每次它只会打印 3(这是 corePoolSize)为什么会这样?正如我们预期的那样,它会扩展到 maxPoolSize。

Executors.newFixedThreadPool return 一个 ExecutorService;一个不公开(并且 可能 很好的理由)方法的接口,例如 setMaximumPoolSizesetCorePoolSize.

如果您创建类型为 Executors.newFixedThreadPool 的池,该池应在应用程序的生命周期内保持不变。如果你想要一个可以调整其大小的池,相应地,你应该使用 Executors.newCachedThreadPool() 代替。

From my understanding, Executors.newFixedThreadPool(3) creates a pool with corePoolSize-3 (...)

查看 newFixedThreadPool 的实现可以看出:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

所以当您将 3 传递给 Executors.newFixedThreadPool 构造函数时,您将 both corePoolSizemaximumPoolSize 设置为3.

(...) and as we keep executing tasks via pool, threads will be created until 3 and then when the underlying queue size reaches 100 and new tasks are still submitted then that's where maxPoolSize comes into picture and threads are scaled to reach maxPoolSize from corePoolSize now.

其实这个说法不是很准确,后面会详细解释。现在阅读 Executors.newFixedThreadPool 的 Java 文档,它指出:

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

因此池不会缩放(除非您明确这样做)。

When the above program is executed,i could see the queue size reaching b/w 12000-20000, if this the case then getPoolSize() must print the value greater than corePoolSize as maxPoolSize is set to 10 but everytime it would only print 3(which is corePoolSize) why does this happen ? as we expect it to scale upto maxPoolSize.

不,这不准确,如果您打印 pool.getMaximumPoolSize() 它将按预期 return 10,调用 pool.setMaximumPoolSize(10); 不会更改 corePoolSize 大小。但是,如果您这样做 pool.setCorePoolSize(10);,您将增加池以能够同时处理 10 个线程。

this.maximumPoolSize只是定义了池应该同时处理的线程数的上限,它不会改变池的当前大小。

为什么池不会动态增加其大小?

更深入地查看 newFixedThreadPool 实现,可以看到池是使用大小等于 Integer.MAX_VALUE 的任务队列 new LinkedBlockingQueue<Runnable>() 初始化的。查看方法 execute 可以在以下评论中阅读:

/*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

如果仔细阅读第 2 点和第 3 点,可以推断 只有 当无法将任务添加到队列时,池会创建比 corePoolSize 指定的线程更多的线程.由于 Executors.newFixedThreadPool 使用带有 Integer.MAX_VALUE 的队列,您无法看到池动态分配更多资源,除非明确设置 corePoolSizepool.setCorePoolSize.

所有这些都是不应该关心的实现细节。因此,为什么 Executors 接口不公开方法,例如 setMaximumPoolSize.

可以从 ThreadPoolExecutor 文档中阅读:

核心和最大池大小

A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize()) according to the bounds set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()). When a new task is submitted in method execute(java.lang.Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).

这基本上证实了池没有动态更新其大小的原因。