Java并发:修改latch/ThreadGroup实现Executor行为

Java concurrency: Modifying latch/ThreadGroup to achieve Executor behaviour

这个问题与我在 Java 并发主题中的家庭作业有关。我的任务是生成新线程并通过给定 concurrencyFactor 限制它们。也就是说,继续调度新线程,直到活动线程数小于或等于concurrencyFactor。如果活动线程数等于 concurrencyFactor,程序将等待直到活动线程数减少到 concurrencyFactor - 1 并创建一个新线程。

作为第一种方法,我使用 ExecutorService 并通过 Executors.newFixedThreadPool(concurrencyFactor); 创建了一个新的固定池,每当调用我的方法时,我只是向该池提交一个新的可运行对象。逻辑代码如下:

    private final ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(concurrencyFactor);
    public void handleRequest(final RequestHandler handler) {
    if (handler == null) throw new IllegalArgumentException("Handler cannot be null");
    fixedPoolExecutor.submit(new Runnable() {
        @Override
        public void run() {
            handler.serviceRoutine();
        }
      });
    }

现在,第二部分要求我在不使用执行器的情况下实现相同的目标。我想到了以下两种方法:
1) 使用 countDownLatch 但此闩锁会等待(即 latch.await())直到 activeCount 变为 0。我只想等到倒计时变成 concurrencyFactor - 1.
2) 使用 ThreadGroup 并等待 threadGroup.activeCount() < concurrencyFactor。但是,这种方法的问题是如何让传入请求等到条件 threadGroup.activeCount() < concurrencyFactor 满足?对于这种方法,我使用了以下代码:

    private final Lock lock = new ReentrantLock();
    private final ThreadGroup threadGroup = new ThreadGroup("myGroup");
    public void handleRequest(final RequestHandler handler) {
    if (handler == null) throw new IllegalArgumentException("Handler cannot be null");
    lock.lock();
    try {
        while (threadGroup.activeCount() >= concurrencyFactor) {

        }
        Thread t = new Thread(threadGroup, new Runnable() {
            @Override
            public void run() {
                handler.service();
            }
        });
        t.start();
    } finally {
        lock.unlock();
    }        
   }

我可以在第二种方法中用一些等待条件替换空白的 while 循环吗?

如有任何关于上述方法的建议或对任何新方法的建议,我们将不胜感激。

我建议使用 Sempahore。信号量将表示仍允许启动的线程数。最初它持有等于配置的并发因子的许可。

在开始一个新线程之前,handleRequest 方法需要获得信号量的许可。启动的线程应该在完成后再次释放。

示例代码:

private final ThreadGroup threadGroup = new ThreadGroup("myGroup");
private final Semaphore concurrencyFactor = new Semaphore(CONCURRENCY_FACTOR);

public void handleRequest(final RequestHandler handler) throws InterruptedException {
    if (handler == null) throw new IllegalArgumentException("Handler cannot be null");

    concurrencyFactor.acquire(); // Get permit

    Thread t = new Thread(threadGroup, new Runnable() {
        @Override
        public void run() {
            try {
                handler.service();
            } finally {
                concurrencyFactor.release(); // make sure to release permit
            }
        }
    });
    t.start();
}

(您可能希望以不同方式处理可能的中断)