ThreadPoolExecutor 没有正确收缩

ThreadPoolExecutor does not shrink properly

我处理了一个线程应用程序的设计,该应用程序具有以下要求:它必须具有基于一天中的时间 (Peak/off-peak) 运行 的动态线程数。

我做了功课并研究了最好的方法,我发现 java 有一个 class 名为 ThreadPoolExecutor: java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

这里重点关注的两个变量是corePoolSize和maximumPoolSize,它们与workQueue一起作为线程池的下限和上限。调整这些值有不同的策略,如果不需要显式设置这些参数,建议使用执行器工厂方法而不是构造函数。

public class Main {
    public static void main(String[] args) {
        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100);
        final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, queue);
        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                threadPool.setCorePoolSize(1);
                threadPool.setMaximumPoolSize(1);
                System.out.println("changed");
            }
        };
        new Timer().schedule(task, 10000);

        for (int i = 0; i < 400; i++) {
            threadPool.submit(new WorkItem(i));
        }
    }
}

这是 class 类似于 运行

的线程
public class WorkItem implements Runnable {
    private int workItemNumber;
    private long startTime;

    public WorkItem(int workItemNumber) {
        this.workItemNumber = workItemNumber;
    }

    @Override
    public void run() {
        startTime = System.currentTimeMillis();
        System.out.println("thread Number: " + workItemNumber + " started at: " + startTime);
        while (System.currentTimeMillis() - startTime < 5000) {
        }
        System.out.println("WorkItem done: " + workItemNumber);
    }
}

但是查看日志,执行的线程数没有变化。

看起来 "Set maximum pool size" 方法只会减少空闲线程的数量...

public void setMaximumPoolSize(int maximumPoolSize) {
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        interruptIdleWorkers();
}

如果线程一直很忙,它们看起来不像是被释放了。

(我可能是错的......当一个线程完成清理工作时看起来并没有发生任何神奇的事情 - 需要多看看......)

您已经创建了一个最大线程数为 10 的池

new ThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, queue);

并且您已经提交了 400 个任务

for (int i = 0; i < 400; i++) {
    threadPool.submit(new Thread(System.currentTimeMillis(), i));
}

线程池不会使用超过 10 个线程(java.lang.Thread class 表示的线程)来执行您的任务。

提交和执行所有这些任务所需的时间少于您为 TimerTask

设置的 10000 毫秒延迟
new Timer().schedule(task, 10000, 5000);

一旦您的 TimerTask 为 运行,您的池将只有一个线程 运行 正在执行并声明已提交的任务(一旦其他线程的任务完成)。


示例将显示一旦 TimerTask 已执行(并且在任何执行任务完成后)ThreadPoolExecutor 中将仅保留一个线程

public class Jackson {

    public static void main(String[] args) {
        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100);
        final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, queue);
        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                threadPool.setCorePoolSize(1);
                threadPool.setMaximumPoolSize(1);
                System.out.println("changed");
                this.cancel();
            }
        };
        new Timer().schedule(task, 5, 5000);

        for (int i = 0; i < 400; i++) {
            threadPool.submit(new WorkItem(i));
        }
    }
}

class WorkItem implements Runnable {
    private int workItemNumber;

    public WorkItem(int workItemNumber) {
        this.workItemNumber = workItemNumber;
    }

    @Override
    public void run() {
        System.out.println("WorkItem #" + workItemNumber + " executing on Thread with name: " + Thread.currentThread().getName());
    }
}

您的代码 运行ning 完全符合您的预期。 10个线程启动和运行,100个线程排队。那时,您的主线程(一个排队线程)被阻塞队列阻塞。然后您的计时器将可用线程更改为 1,这意味着您的队列处理速度更慢。然而,您所看到的是,因为您的线程必须等待超过 10 秒才能实际执行,所以它们会立即完成。尝试对您的代码进行以下更改:

public class WorkItem implements Runnable {
    private long startTime;
    private long runTime;
    private int workItemNumber;

    public WorkItem(long startTime, int workItemNumber) {
        this.startTime = startTime;
        this.workItemNumber= workItemNumber;
    }

    @Override
    public void run() {
        System.out.println("WorkItem started: " + workItemNumber + " Queued at: " + startTime);
        runTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - runTime < 10000) {
        }
        System.out.println("WorkItem done: " + workItemNumber);
    }
}

这会让您看到执行情况如您所愿。在核心池设置为 0 的情况下使用数组阻塞队列的奇怪之处在于它只会启动一个线程,然后填满队列,然后启动更多线程(最大池大小)。如果您对排队代码进行细微更改,就会看到这种情况。

for (int i = 1; i < 101; i++) {
    threadPool.submit(new WorkItem(System.currentTimeMillis(), i));
}

for (int i = 101; i < 401; i++) {
    long thisTime = System.currentTimeMillis();
    threadPool.submit(new WorkItem(System.currentTimeMillis(), i));
    while (System.currentTimeMillis() - thisTime < 500) {

    }
}

虽然在更高版本的 JDK/Grails 上使用它,但设置 max poolSize 减少它在旧版本的 grails 和 JDK7 中运行良好。 (不确定问题出在哪里我不得不这样做)

private static final int actualPoolSize =  Holders.grailsApplication.config.maximumPoolSize ?: 3
    private static int maxPoolSize = actualPoolSize
public EnhancedExecutor() {
        super(maxPoolSize,maxPoolSize,keepAliveTime,timeoutUnit,new PriorityBlockingQueue<Runnable>(maxQueue))
    }
public void setMaxPoolSize(int i) {
        this.maxPoolSize=i
        super.purge()
        super.setCorePoolSize(i?:actualPoolSize)        
        super.setMaximumPoolSize(i?:actualPoolSize)
    }

如果没有清除,我可以毫无错误地提高到更高的水平。尝试 reduce 为 i 返回 null。或 actualPoolSize。 (似乎不想在不抛出异常的情况下缩小)。

我接受了 BretC 关于线程繁忙的评论,似乎清除解决了这个问题,方法是确保在尝试重置 superValues 之前重置所有内容