executorService 使用 SynchronousQueue 抛出 RejectException

the executorService throw RejectException with SynchronousQueue

我想做一个boundedExecutor,只能并行执行固定数量的线程。当添加更多任务时,执行程序将阻塞直到其他线程完成。

这里是我在其他问题中找到的Executor

public class BoundedExecutor extends ThreadPoolExecutor {

    private final Logger logger = LogManager.getLogger(BoundedExecutor.class);
    private final Semaphore semaphore;

    public BoundedExecutor(int bound){
        super(bound, bound, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        this.semaphore = new Semaphore(bound);
    }

    @Override
    public void execute(Runnable task) {
        try {
            semaphore.acquire();
            super.execute(task);
        } catch (InterruptedException e) {
            logger.error("interruptedException while acquiring semaphore");
        }
    }

    protected void afterExecute(final Runnable task, final Throwable t){
        super.afterExecute(task, t);
        semaphore.release();
    }
}

和主要代码

    public static void main(String[] args) throws Exception {

        Runnable task = () -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " complete.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        BoundedExecutor pool = new BoundedExecutor(1);
        for(int i = 0; i < 10; i++){
            pool.execute(task);
        }
        pool.shutdown();
    }

我以为代码是单线程的,会按顺序执行任务,但实际上,当第一个任务完成时,执行程序抛出 java.util.concurrent.RejectedExecutionException。

据我了解,semaphore.acquire() 将阻塞线程,直到第一个任务完成并释放信号量,代码有什么问题?

我会制作队列块而不使用信号量

public static void main(String[] args) {
    SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>() {
        @Override
        public boolean offer(Runnable runnable) {
            try {
                return super.offer(runnable, 1, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
    };
    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, queue);
    for (int i = 0; i < 10; i++) {
        final int finalI = i;
        pool.execute(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(LocalTime.now() + " - " + Thread.currentThread().getName() + " " + finalI + " complete");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    pool.shutdown();
}

打印

13:24:14.241 - pool-1-thread-1 0 complete
13:24:15.247 - pool-1-thread-1 1 complete
13:24:16.247 - pool-1-thread-1 2 complete
13:24:17.247 - pool-1-thread-1 3 complete
13:24:18.248 - pool-1-thread-1 4 complete
13:24:19.248 - pool-1-thread-1 5 complete
13:24:20.248 - pool-1-thread-1 6 complete
13:24:21.248 - pool-1-thread-1 7 complete
13:24:22.249 - pool-1-thread-1 8 complete
13:24:23.249 - pool-1-thread-1 9 complete