ExecutorService 和 AtomicInteger:RejectedExecutionException

ExecutorService and AtomicInteger : RejectedExecutionException

我希望 atomicInteger 的值为 100 然后程序终止

 public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        do {
            executor.submit(() -> {
                System.out.println(atomicInteger.getAndAdd(10));
                if (atomicInteger.get() == 100) {
                    //executor.shutdownNown();
                }
            });
        } while (true);
    }

我有错误

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1d8d10a rejected from java.util.concurrent.ThreadPoolExecutor@9e54c2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 10]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1374)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)

我该如何实现。

你应该只改变你的 while 循环来检查你需要的条件,然后关闭执行程序

这里不需要使用 AtomicInteger,因为你的 Runnable lambda 函数调用保证按顺序执行(通过新的 SingleThreadExecutor)。此外,您的 Runnable lambda 代码需要任何时间来执行(例如 2 毫秒),您的主循环将排队超过 10 个任务来达到您的限制。如果您在 Runnable lambda 函数中添加一个 2ms 的睡眠,并在 do/while 循环中添加一个计数器,并在最后打印计数器的值以查看您排队的 Runnables 实例数,您会看到这种情况发生向上。

假设您希望使用并发线程测试此代码,您需要将对 newSingleThreadPool 的调用替换为 newFixedThreadPool。当使用并发线程时,您的代码采用的方法是有问题的。在下面的代码中,我切换到 newFixedThreadPool,添加了一个计数器,这样我们就可以看到有多少任务在排队,并在您的 Runnable lambda 函数中添加了短暂的暂停,只是为了表示少量工作。当我执行这个程序时,atomicInteger 变得大于 13000 并且程序崩溃 java.lang.OutOfMemoryError: GC overhead limit exceeded 那是因为,你的可运行函数总是将 10 添加到 atomicInteger 而不管它是当前值。而且,代码排队的任务比它需要的多。下面是具有这些说明问题的小更改的代码。

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    AtomicInteger atomicInteger = new AtomicInteger(0);
    int i=0;
    do {
        executor.submit(() -> {
            pause(2); // simulates some small amount of work.
            System.out.println("atomicInt="+atomicInteger.getAndAdd(10));
            pause(2); // simulates some small amount of work.
            if (atomicInteger.get() == 100) {
                System.out.println("executor.shutdownNow()");
                System.out.flush();
                executor.shutdownNow();
            }
        });
        if (atomicInteger.get() == 100) {
            break;
        }
    } while (true);
    System.out.println("final atomicInt="+atomicInteger.get());
    System.out.println("final tasks queued="+i);
}
public static void pause(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException ex) {
    }
}

这是一个修复并发问题并将执行程序管理移出它实际上不属于的工作线程的版本:

private static int LIMIT = 100;
private static int INCREMENT = 10;

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    AtomicInteger atomicInteger = new AtomicInteger(0);
    for (int i=0; i < LIMIT/INCREMENT; i++) {
            executor.submit(() -> {
                pause(2);
                System.out.println("atomicInt=" + atomicInteger.getAndAdd(INCREMENT));
                System.out.flush();
                pause(2);
            });
    }
    executor.shutdown();
    while (!executor.isTerminated()) {
        System.out.println("Executor not yet terminated");
        System.out.flush();
        pause(4);
    }
    System.out.println("final atomicInt=" + atomicInteger.get());
}

public static void pause(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException ex) {

    }
}